From 93deccaaa84036c0fdb8a77ebf08d95d41dfc3a5 Mon Sep 17 00:00:00 2001 From: Kures <14836932+Kures@users.noreply.github.com> Date: Fri, 8 May 2026 00:32:06 +0300 Subject: [PATCH 1/2] docs(api): add OpenAPI 3.1 spec for HTTP API Closes P2-04 from reference/todo.md. Adds docs/openapi.yaml describing all 36 HTTP operations and their request/response shapes, plus docs/api/README.md with viewer and SDK-generation instructions. The spec is reverse-engineered from src/api.zig and src/types.zig on main and validates clean with python -m openapi_spec_validator. Coverage: - 31 paths / 36 operations - 30 schemas (run, step, worker, workflow, checkpoint, tracker, ...) - 5 reusable parameters, 3 reusable responses - security: bearerAuth on everything except /health and /metrics - documents idempotency contract for POST /runs - documents drain mode behavior for POST /admin/drain Drives downstream work: - generated SDKs (TypeScript, Python, Go) via openapi-generator-cli - Swagger UI / Redoc preview - contract tests against a running orchestrator --- docs/api/README.md | 133 ++++ docs/openapi.yaml | 1781 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1914 insertions(+) create mode 100644 docs/api/README.md create mode 100644 docs/openapi.yaml diff --git a/docs/api/README.md b/docs/api/README.md new file mode 100644 index 0000000..8287b13 --- /dev/null +++ b/docs/api/README.md @@ -0,0 +1,133 @@ +# NullBoiler HTTP API + +This directory hosts the HTTP API contract for NullBoiler. + +- **Source of truth:** [`docs/openapi.yaml`](../openapi.yaml) — OpenAPI 3.1 +- **Maintainer roadmap reference:** [`reference/todo.md` P2-04](../../reference/todo.md) + +The spec covers all 36 HTTP operations exposed by `src/api.zig` and all +domain types from `src/types.zig`. + +## At a glance + +| Group | Endpoints | +|---|---| +| Health, Metrics | `GET /health`, `GET /metrics` | +| Runs | `POST /runs`, `GET /runs`, `GET /runs/{id}`, `POST /runs/{id}/{cancel,retry,resume,replay,state}`, `POST /runs/fork` | +| Steps & Events | `GET /runs/{id}/steps`, `GET /runs/{id}/steps/{step_id}`, `GET /runs/{id}/events`, `GET /runs/{id}/stream` (SSE) | +| Checkpoints | `GET /runs/{id}/checkpoints`, `GET /runs/{id}/checkpoints/{cp_id}` | +| Workers | `POST /workers`, `GET /workers`, `DELETE /workers/{id}` | +| Workflows | full CRUD on `/workflows`, plus `validate`, `mermaid`, `run` | +| Tracker bridge | `GET /tracker/{status,tasks,stats,tasks/{id}}`, `POST /tracker/refresh` | +| Admin | `POST /admin/drain`, `GET /rate-limits` | +| Internal | `POST /internal/agent-events/{run_id}/{step_id}` (worker callback) | + +## Quick start + +### View the spec + +```bash +# Redoc (no install — uses npx) +npx @redocly/cli preview-docs docs/openapi.yaml + +# Swagger UI (Docker) +docker run --rm -p 8088:8080 \ + -e SWAGGER_JSON=/spec/openapi.yaml \ + -v "$(pwd)/docs:/spec" \ + swaggerapi/swagger-ui +# then open http://localhost:8088 +``` + +### Validate locally + +```bash +# Python +python -m pip install openapi-spec-validator +python -m openapi_spec_validator docs/openapi.yaml + +# Node +npx @apidevtools/swagger-cli validate docs/openapi.yaml + +# Redocly (also runs lint rules beyond bare OpenAPI) +npx @redocly/cli lint docs/openapi.yaml +``` + +### Generate client SDKs + +The spec is suitable for `openapi-generator-cli`. Recommended targets and +generators: + +```bash +# TypeScript (fetch-based, browser & node) +npx @openapitools/openapi-generator-cli generate \ + -i docs/openapi.yaml \ + -g typescript-fetch \ + -o sdks/typescript-fetch \ + --additional-properties=npmName=@nullboiler/client,supportsES6=true,typescriptThreePlus=true + +# Python (httpx async + sync) +npx @openapitools/openapi-generator-cli generate \ + -i docs/openapi.yaml \ + -g python \ + -o sdks/python \ + --additional-properties=packageName=nullboiler_client,projectName=nullboiler-client + +# Go +npx @openapitools/openapi-generator-cli generate \ + -i docs/openapi.yaml \ + -g go \ + -o sdks/go \ + --additional-properties=packageName=nullboiler,withGoMod=true +``` + +For first-class language coverage we recommend publishing each SDK from +its own repository (e.g. `nullboiler/nullboiler-ts-sdk`) and pinning a +spec version per release tag. + +## Conventions + +- **Ids** — opaque strings; do not parse (currently 22-char ULIDs but + this is not part of the contract). +- **Timestamps** — `*_ms` fields are milliseconds since the Unix epoch + (UTC), `int64`. +- **Errors** — every 4xx/5xx response uses the same envelope: + ```json + {"error": {"code": "", "message": ""}} + ``` + See `ErrorDetail.code` for the closed enum of codes. +- **Idempotency** — `POST /runs` and `POST /workflows/{id}/run` honor + `Idempotency-Key` (preferred) or `idempotency_key` body field. +- **Auth** — bearer token; `/health` and `/metrics` are public so that + load balancers and Prometheus scrapers can reach them without + provisioning a token. + +## Versioning the spec + +The spec carries the same `info.version` as `GET /health` returns. When +the API surface changes: + +1. Update `src/api.zig` and the matching tests. +2. Update `docs/openapi.yaml` and bump `info.version` in lockstep with + the next NullBoiler release. +3. Re-run `python -m openapi_spec_validator docs/openapi.yaml` (or the + Node equivalent) before committing. +4. Regenerate any vendored SDKs you ship. + +A future enhancement (P2-03 in `reference/todo.md`) is to validate the +spec against a running orchestrator in CI by hitting every endpoint with +a smoke client. + +## Provenance + +This spec was authored from the source of truth files on the `main` +branch: + +- `src/api.zig` — route table (`handleRequest`) and per-handler bodies +- `src/types.zig` — all enums and DB row types +- `src/strategy.zig` — strategy expansion semantics +- `src/workflow_loader.zig` — workflow JSON shape +- `src/workflow_validation.zig` — validation rules +- `src/metrics.zig` — Prometheus exposition (used in `/metrics` example) + +If you change one of those files, update this spec. CI does not yet +diff them, so the discipline is currently social. diff --git a/docs/openapi.yaml b/docs/openapi.yaml new file mode 100644 index 0000000..b96a463 --- /dev/null +++ b/docs/openapi.yaml @@ -0,0 +1,1781 @@ +openapi: 3.1.0 +info: + title: NullBoiler Orchestrator API + summary: Workflow orchestration HTTP API for the Null ecosystem. + description: | + NullBoiler is the orchestration engine of the Null ecosystem. It pulls work + from a tracker (or accepts ad-hoc runs), applies scheduling/routing + strategies, and dispatches steps to compatible worker runtimes (NullClaw, + OpenClaw-compatible, ZeroClaw, PicoClaw bridges). + + This document is the source of truth for the HTTP surface used by: + + - dashboards and operators talking to a single NullBoiler instance, + - automation/SDKs driving runs and workflows, + - integration tests that need to assert request/response shapes, + - generated client SDKs (TypeScript, Python, Go) — see + `docs/api/README.md` for the generator command. + + The spec was reverse-engineered from `src/api.zig` and `src/types.zig` + on the NullBoiler `main` branch and tracks the same version reported by + `GET /health`. + + ## Endpoint availability + + A few endpoints are conditional on configuration or build version: + + - **`/tracker/*`** — only mounted when `tracker` is configured in + `nullboiler.config.json`. Otherwise the orchestrator runs in + tracker-less mode and these paths return `404`. + - **`/workflows/*`** and **`/rate-limits`** — present on `main`. + The published `ghcr.io/nullclaw/nullboiler:2026.3.2` image (built + from an earlier commit) returns `404` for them. They are + documented here because consumers targeting `main` or a future + release will see them. + + ## Authentication + + All endpoints except `GET /health` and `GET /metrics` require a bearer + token when the orchestrator is started with `--api-token` or + `NULLBOILER_API_TOKEN`. When no token is configured, the API is open. + + ## Idempotency + + `POST /runs` accepts an idempotency key either as the + `Idempotency-Key` request header (preferred) or as `idempotency_key` + in the body. Replays of the same payload return `200` with + `idempotent_replay: true`. Replays with a different payload return + `409 conflict`. + + ## Drain mode + + `POST /admin/drain` puts the orchestrator into drain mode. While + draining, `POST /runs` returns `503` and existing runs continue to + completion. Drain mode is process-local and resets on restart. + version: "2026.3.2" + license: + name: MIT + identifier: MIT + contact: + name: NullBoiler maintainers + url: https://github.com/nullclaw/nullboiler + +servers: + - url: http://localhost:8080 + description: Local development (default port from docker-compose) + - url: http://localhost:3000 + description: Local development (default Dockerfile CMD port) + - url: https://{host} + description: Custom deployment + variables: + host: + default: nullboiler.example.com + +tags: + - name: Health + description: Liveness, version, and aggregate health probes. + - name: Metrics + description: Prometheus-format scrape endpoint for `/metrics`. + - name: Runs + description: Workflow run lifecycle — create, inspect, cancel, retry, replay, fork. + - name: Steps + description: Individual step inspection within a run. + - name: Events + description: Run-scoped event feed (polling and SSE streaming). + - name: Checkpoints + description: Run state snapshots used for resume, fork, and replay. + - name: Workers + description: Worker registry — register, list, delete worker runtimes. + - name: Workflows + description: Reusable workflow definitions (CRUD + validate + mermaid + run). + - name: Tracker + description: Bridge to NullTickets when running in tracker-driven mode. + - name: Admin + description: Operational controls — drain mode, rate limit inspection. + - name: Internal + description: Endpoints used by worker runtimes — not for external consumers. + +security: + - bearerAuth: [] + +paths: + /health: + get: + tags: [Health] + summary: Health probe + description: Public health probe. Always returns 200 with current version and counters. + operationId: getHealth + security: [] + responses: + '200': + description: Health snapshot + content: + application/json: + schema: + $ref: '#/components/schemas/HealthResponse' + examples: + ok: + value: + status: ok + version: "2026.3.2" + active_runs: 3 + total_workers: 2 + + /metrics: + get: + tags: [Metrics] + summary: Prometheus metrics + description: | + Prometheus exposition format. Public (no auth) so that scrapers can + reach it without provisioning a token. See `dashboards/grafana/` + for ready-to-import dashboards. + operationId: getMetrics + security: [] + responses: + '200': + description: Metrics in text exposition format + content: + text/plain: + schema: + type: string + example: | + # TYPE nullboiler_http_requests_total counter + nullboiler_http_requests_total 1234 + # TYPE nullboiler_runs_created_total counter + nullboiler_runs_created_total 42 + # ... 9 more counters + + /admin/drain: + post: + tags: [Admin] + summary: Enter drain mode + description: | + Refuses new runs. Existing runs continue. Drain mode is process-local + and resets on restart. There is no inverse endpoint — restart the + orchestrator to leave drain mode. + operationId: enableDrain + responses: + '200': + description: Drain mode enabled + content: + application/json: + schema: + type: object + required: [status] + properties: + status: + type: string + enum: [draining] + '500': + $ref: '#/components/responses/InternalError' + + /rate-limits: + get: + tags: [Admin] + summary: Inspect per-worker rate limit state + description: | + Returns rate limit state per worker_id. Empty array when no rate + limits are configured or none have been observed yet. Useful for + investigating dispatch backpressure. + operationId: listRateLimits + responses: + '200': + description: Array of per-worker rate limit snapshots + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/RateLimitEntry' + + /workers: + get: + tags: [Workers] + summary: List registered workers + operationId: listWorkers + responses: + '200': + description: Array of workers + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Worker' + '500': + $ref: '#/components/responses/InternalError' + post: + tags: [Workers] + summary: Register a worker + description: | + Registers a worker with the orchestrator. Workers can also be + configured statically in `config.json` — those have `source: config`. + Registered workers have `source: registered`. + operationId: registerWorker + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerRegistration' + responses: + '201': + description: Worker registered + content: + application/json: + schema: + type: object + required: [id, status, protocol] + properties: + id: + type: string + status: + type: string + enum: [active] + protocol: + $ref: '#/components/schemas/WorkerProtocol' + '400': + $ref: '#/components/responses/BadRequest' + '409': + description: Worker id already exists + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + $ref: '#/components/responses/InternalError' + + /workers/{id}: + delete: + tags: [Workers] + summary: Remove a worker + operationId: deleteWorker + parameters: + - $ref: '#/components/parameters/WorkerId' + responses: + '200': + description: Worker removed + content: + application/json: + schema: + type: object + required: [ok] + properties: + ok: + type: boolean + enum: [true] + '500': + $ref: '#/components/responses/InternalError' + + /runs: + post: + tags: [Runs] + summary: Create a workflow run + description: | + Creates a new run from an inline workflow definition. Use + `/workflows/{id}/run` to launch a stored workflow instead. + + Idempotency: pass `Idempotency-Key` header (preferred) or + `idempotency_key` body field. Replays of the exact same payload + return `200` with `idempotent_replay: true`. Different payloads + with the same key return `409 conflict`. + operationId: createRun + parameters: + - name: Idempotency-Key + in: header + required: false + schema: + type: string + minLength: 1 + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreate' + responses: + '200': + description: Idempotent replay of an existing run + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + examples: + replay: + value: + id: r_abc123 + status: completed + idempotent_replay: true + '201': + description: Run created + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + examples: + created: + value: + id: r_abc123 + status: running + idempotent_replay: false + '400': + $ref: '#/components/responses/BadRequest' + '409': + description: Idempotency conflict — same key, different payload + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '503': + description: Orchestrator is in drain mode + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + draining: + value: + error: + code: draining + message: orchestrator is draining and does not accept new runs + '500': + $ref: '#/components/responses/InternalError' + get: + tags: [Runs] + summary: List runs + description: | + Returns a paginated wrapper. Items are the condensed run shape + (id, status, optional idempotency_key, optional workflow_id, + timestamps) — use `GET /runs/{id}` for the full detail with + embedded steps, workflow snapshot, input, callbacks, and state. + operationId: listRuns + parameters: + - name: status + in: query + required: false + schema: + $ref: '#/components/schemas/RunStatus' + description: Filter by run status. + - name: workflow_id + in: query + required: false + schema: + type: string + description: Filter to runs created from a specific stored workflow. + - name: limit + in: query + required: false + schema: + type: integer + minimum: 1 + maximum: 1000 + default: 100 + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + maximum: 1000000000 + default: 0 + responses: + '200': + description: Paginated list of runs (newest first) + content: + application/json: + schema: + $ref: '#/components/schemas/RunListResponse' + examples: + page: + value: + items: + - id: ea25c0ac-9c45-441c-a1ff-d2f4602a3b2e + status: failed + created_at_ms: 1778183970318 + updated_at_ms: 1778183975351 + limit: 100 + offset: 0 + next_offset: 1 + has_more: false + + /runs/{id}: + get: + tags: [Runs] + summary: Get run detail + operationId: getRun + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Run with embedded steps + content: + application/json: + schema: + $ref: '#/components/schemas/RunDetail' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/cancel: + post: + tags: [Runs] + summary: Cancel a run + operationId: cancelRun + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Cancellation accepted + content: + application/json: + schema: + $ref: '#/components/schemas/Run' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/retry: + post: + tags: [Runs] + summary: Retry a failed run + description: Creates a new run from the workflow snapshot of the failed run. + operationId: retryRun + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '201': + description: Retry created + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/steps: + get: + tags: [Steps] + summary: List steps in a run + operationId: listRunSteps + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Array of steps + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Step' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/steps/{step_id}: + get: + tags: [Steps] + summary: Get step detail + operationId: getStep + parameters: + - $ref: '#/components/parameters/RunId' + - $ref: '#/components/parameters/StepId' + responses: + '200': + description: Step detail + content: + application/json: + schema: + $ref: '#/components/schemas/Step' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/events: + get: + tags: [Events] + summary: List run events + description: | + Polling-mode event feed. For continuous tailing, use + `GET /runs/{id}/stream` (SSE). + operationId: listRunEvents + parameters: + - $ref: '#/components/parameters/RunId' + - name: since + in: query + required: false + schema: + type: integer + format: int64 + description: Only return events with `id` greater than this value. + responses: + '200': + description: Array of events ordered by id ascending + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Event' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/stream: + get: + tags: [Events] + summary: Stream run events via SSE + description: | + Server-Sent Events stream of run events. Clients should reconnect + with `Last-Event-ID` to resume after disconnects. Stream ends when + the run reaches a terminal status (completed, failed, cancelled). + operationId: streamRunEvents + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: SSE event stream + content: + text/event-stream: + schema: + type: string + description: | + SSE format: each event has `id:`, `event:`, `data:` lines. + `data` is a JSON object with the same shape as `Event`. + + /runs/{id}/checkpoints: + get: + tags: [Checkpoints] + summary: List checkpoints for a run + operationId: listCheckpoints + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Array of checkpoints + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Checkpoint' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/checkpoints/{cp_id}: + get: + tags: [Checkpoints] + summary: Get checkpoint detail + operationId: getCheckpoint + parameters: + - $ref: '#/components/parameters/RunId' + - $ref: '#/components/parameters/CheckpointId' + responses: + '200': + description: Checkpoint with full state + content: + application/json: + schema: + $ref: '#/components/schemas/Checkpoint' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/resume: + post: + tags: [Runs] + summary: Resume run from a checkpoint + operationId: resumeRun + parameters: + - $ref: '#/components/parameters/RunId' + requestBody: + required: true + content: + application/json: + schema: + type: object + required: [checkpoint_id] + properties: + checkpoint_id: + type: string + description: Checkpoint to resume from. + state_overrides: + type: object + additionalProperties: true + description: Optional state values to inject before resuming. + responses: + '200': + description: Run resumed + content: + application/json: + schema: + $ref: '#/components/schemas/Run' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + + /runs/fork: + post: + tags: [Runs] + summary: Fork a run from a checkpoint into a new run + description: | + Creates a new run that branches from the parent's checkpoint. + Useful for what-if exploration without disturbing the parent run. + operationId: forkRun + requestBody: + required: true + content: + application/json: + schema: + type: object + required: [parent_run_id, checkpoint_id] + properties: + parent_run_id: + type: string + checkpoint_id: + type: string + state_overrides: + type: object + additionalProperties: true + responses: + '201': + description: Forked run created + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/state: + post: + tags: [Runs] + summary: Inject state into a running run + description: | + Schedules an injection of state updates into the run. Optionally + delays application until after a specified step completes. + operationId: injectRunState + parameters: + - $ref: '#/components/parameters/RunId' + requestBody: + required: true + content: + application/json: + schema: + type: object + required: [updates] + properties: + updates: + type: object + additionalProperties: true + description: State key/value updates to merge. + apply_after_step: + type: string + description: Optional def_step_id — defer until that step finishes. + responses: + '202': + description: Injection queued + content: + application/json: + schema: + type: object + properties: + queued: { type: boolean, enum: [true] } + injection_id: { type: integer, format: int64 } + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/replay: + post: + tags: [Runs] + summary: Replay a completed run + description: | + Re-executes a run deterministically from its frozen workflow + snapshot. Useful for debugging non-determinism or regression checks. + operationId: replayRun + parameters: + - $ref: '#/components/parameters/RunId' + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + from_step: + type: string + description: Optional def_step_id to start replay from. + responses: + '201': + description: Replay run created + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + '404': + $ref: '#/components/responses/NotFound' + + /workflows: + post: + tags: [Workflows] + summary: Store a reusable workflow definition + operationId: createWorkflow + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/WorkflowCreate' + responses: + '201': + description: Workflow stored + content: + application/json: + schema: + $ref: '#/components/schemas/Workflow' + '400': + $ref: '#/components/responses/BadRequest' + '409': + description: Workflow id already exists + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + get: + tags: [Workflows] + summary: List stored workflows + operationId: listWorkflows + responses: + '200': + description: Array of workflows (definitions omitted in summary form) + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/WorkflowSummary' + + /workflows/{id}: + get: + tags: [Workflows] + summary: Get workflow with full definition + operationId: getWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Workflow with definition + content: + application/json: + schema: + $ref: '#/components/schemas/Workflow' + '404': + $ref: '#/components/responses/NotFound' + put: + tags: [Workflows] + summary: Update workflow definition + description: | + Replaces the workflow definition. Increments `version`. Existing + runs already created from older versions are unaffected (they hold + a frozen snapshot of the workflow). + operationId: updateWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/WorkflowUpdate' + responses: + '200': + description: Updated workflow + content: + application/json: + schema: + $ref: '#/components/schemas/Workflow' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + delete: + tags: [Workflows] + summary: Delete a workflow + operationId: deleteWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Deleted + content: + application/json: + schema: + type: object + properties: + ok: { type: boolean, enum: [true] } + '404': + $ref: '#/components/responses/NotFound' + + /workflows/{id}/validate: + post: + tags: [Workflows] + summary: Validate a stored workflow definition + description: | + Runs the same validation as `POST /runs` but without creating a + run. Returns a list of validation errors (empty array on success). + operationId: validateWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Validation result + content: + application/json: + schema: + $ref: '#/components/schemas/ValidationResult' + '404': + $ref: '#/components/responses/NotFound' + + /workflows/{id}/mermaid: + get: + tags: [Workflows] + summary: Render workflow as Mermaid diagram + description: Returns a Mermaid `flowchart` string suitable for embedding in docs. + operationId: getWorkflowMermaid + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Mermaid source + content: + text/plain: + schema: + type: string + example: | + flowchart TD + start --> plan + plan --> build + build --> review + '404': + $ref: '#/components/responses/NotFound' + + /workflows/{id}/run: + post: + tags: [Workflows] + summary: Launch a run from a stored workflow + description: | + Convenience over `POST /runs` when the workflow definition is + already stored. Idempotency works the same way. + operationId: runWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + - name: Idempotency-Key + in: header + required: false + schema: + type: string + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + input: + type: object + additionalProperties: true + callbacks: + type: object + additionalProperties: true + idempotency_key: + type: string + responses: + '201': + description: Run created + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + '200': + description: Idempotent replay + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + '503': + description: Drain mode + + /tracker/status: + get: + tags: [Tracker] + summary: NullTickets bridge status + description: | + Reports whether NullBoiler is connected to a NullTickets tracker + and basic queue health. Returns a degraded-but-200 status when + no tracker is configured. + operationId: getTrackerStatus + responses: + '200': + description: Tracker bridge status + content: + application/json: + schema: + $ref: '#/components/schemas/TrackerStatus' + + /tracker/tasks: + get: + tags: [Tracker] + summary: List active tracker tasks + operationId: listTrackerTasks + responses: + '200': + description: Array of tracker tasks + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/TrackerRun' + + /tracker/tasks/{task_id}: + get: + tags: [Tracker] + summary: Tracker task detail + operationId: getTrackerTask + parameters: + - name: task_id + in: path + required: true + schema: + type: string + responses: + '200': + description: Tracker task detail + content: + application/json: + schema: + $ref: '#/components/schemas/TrackerRun' + '404': + $ref: '#/components/responses/NotFound' + + /tracker/stats: + get: + tags: [Tracker] + summary: Aggregate tracker statistics + operationId: getTrackerStats + responses: + '200': + description: Aggregate counts by state + content: + application/json: + schema: + type: object + additionalProperties: + type: integer + description: Map of `TrackerTaskState` to count. + example: + running: 4 + completed: 192 + failed: 7 + stalled: 1 + + /tracker/refresh: + post: + tags: [Tracker] + summary: Force a tracker poll cycle + description: | + Asks the tracker bridge to immediately claim and dispatch tasks + instead of waiting for the next poll tick. Idempotent — if a poll + is already in-flight, the request is dropped. + operationId: refreshTracker + responses: + '202': + description: Refresh queued + content: + application/json: + schema: + type: object + properties: + queued: { type: boolean, enum: [true] } + + /internal/agent-events/{run_id}/{step_id}: + post: + tags: [Internal] + summary: Worker callback — agent event ingest + description: | + Workers post agent execution events here (tool calls, model + results, status updates). Not for external consumers. + operationId: postAgentEvent + parameters: + - name: run_id + in: path + required: true + schema: + type: string + - name: step_id + in: path + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/AgentEventPayload' + responses: + '200': + description: Event accepted + content: + application/json: + schema: + type: object + properties: + ok: { type: boolean, enum: [true] } + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + +components: + securitySchemes: + bearerAuth: + type: http + scheme: bearer + bearerFormat: token + description: | + Configured via `--api-token` CLI flag or `NULLBOILER_API_TOKEN` + environment variable. Health and metrics endpoints are unauthenticated. + + parameters: + RunId: + name: id + in: path + required: true + schema: + type: string + description: Run identifier returned from `POST /runs`. + StepId: + name: step_id + in: path + required: true + schema: + type: string + CheckpointId: + name: cp_id + in: path + required: true + schema: + type: string + WorkerId: + name: id + in: path + required: true + schema: + type: string + WorkflowId: + name: id + in: path + required: true + schema: + type: string + + responses: + BadRequest: + description: Validation error + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + invalid_json: + value: + error: + code: bad_request + message: invalid JSON body + missing_field: + value: + error: + code: bad_request + message: 'missing required field: steps' + NotFound: + description: Resource not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + not_found: + value: + error: + code: not_found + message: run not found + InternalError: + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + oom: + value: + error: + code: internal + message: out of memory + + schemas: + HealthResponse: + type: object + required: [status, version, active_runs, total_workers] + properties: + status: + type: string + enum: [ok] + version: + type: string + example: "2026.3.2" + active_runs: + type: integer + format: int64 + minimum: 0 + total_workers: + type: integer + format: int64 + minimum: 0 + + ErrorResponse: + type: object + required: [error] + properties: + error: + $ref: '#/components/schemas/ErrorDetail' + + ErrorDetail: + type: object + required: [code, message] + properties: + code: + type: string + enum: + - bad_request + - conflict + - not_found + - internal + - draining + - unauthorized + message: + type: string + + RunStatus: + type: string + enum: [pending, running, interrupted, completed, failed, cancelled, forked] + + StepStatus: + type: string + enum: [pending, ready, running, completed, failed, skipped, interrupted] + + StepType: + type: string + enum: [task, route, interrupt, agent, send, transform, subgraph] + + WorkerStatus: + type: string + enum: [active, draining, dead] + + WorkerSource: + type: string + enum: [config, registered] + + WorkerProtocol: + type: string + enum: [webhook, api_chat, openai_chat, mqtt, redis_stream, a2a] + description: | + - `webhook` — requires URL with explicit path (e.g. `/webhook`) + - `api_chat` / `openai_chat` — OpenAI-compatible chat completion + - `mqtt` — pub/sub MQTT broker + - `redis_stream` — Redis Streams + - `a2a` — Google Agent-to-Agent protocol + + TrackerTaskState: + type: string + enum: + - claiming + - workspace_setup + - spawning + - running + - completing + - completed + - failed + - stalled + - cooldown + - removing + + ReducerType: + type: string + enum: [last_value, append, merge, add, min, max, add_messages] + + Worker: + type: object + required: [id, url, protocol, model, tags, max_concurrent, source, status, consecutive_failures, created_at_ms] + properties: + id: { type: string } + url: { type: string, format: uri } + protocol: { $ref: '#/components/schemas/WorkerProtocol' } + model: + type: string + nullable: true + tags: + type: array + items: + type: string + max_concurrent: + type: integer + format: int64 + minimum: 1 + source: { $ref: '#/components/schemas/WorkerSource' } + status: { $ref: '#/components/schemas/WorkerStatus' } + consecutive_failures: + type: integer + format: int64 + minimum: 0 + circuit_open_until_ms: + type: integer + format: int64 + nullable: true + last_error_text: + type: string + nullable: true + created_at_ms: + type: integer + format: int64 + + WorkerRegistration: + type: object + required: [id, url] + properties: + id: + type: string + minLength: 1 + example: claw-1 + url: + type: string + format: uri + description: Must include explicit path for `webhook` protocol (e.g. `http://nullclaw:3000/webhook`). + example: http://nullclaw:3000/webhook + token: + type: string + default: "" + protocol: + allOf: + - $ref: '#/components/schemas/WorkerProtocol' + default: webhook + model: + type: string + description: Required when `protocol` is `openai_chat`. + tags: + type: array + items: + type: string + default: [] + max_concurrent: + type: integer + minimum: 1 + default: 1 + + RunSummary: + type: object + description: | + Condensed run shape returned in list responses. Optional + `idempotency_key` and `workflow_id` fields are present only when + non-null on the row. + required: [id, status, created_at_ms, updated_at_ms] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + idempotency_key: + type: string + description: Present only when set on the run. + workflow_id: + type: string + description: Present only when the run was launched from a stored workflow. + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + + RunListResponse: + type: object + required: [items, limit, offset, next_offset, has_more] + properties: + items: + type: array + items: + $ref: '#/components/schemas/RunSummary' + limit: + type: integer + format: int64 + minimum: 1 + maximum: 1000 + description: Page size echoed back. + offset: + type: integer + format: int64 + minimum: 0 + description: Page offset echoed back. + next_offset: + type: integer + format: int64 + description: | + Pass as `offset` to the next request. Equals + `offset + items.length` on the last page. + has_more: + type: boolean + description: | + True when at least one more page exists. Computed by + fetching `limit + 1` rows and checking the overflow. + + Run: + type: object + required: [id, status, workflow_id, created_at_ms, updated_at_ms] + properties: + id: { type: string } + idempotency_key: + type: string + nullable: true + status: { $ref: '#/components/schemas/RunStatus' } + workflow_id: + type: string + nullable: true + error_text: + type: string + nullable: true + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + started_at_ms: + type: integer + format: int64 + nullable: true + ended_at_ms: + type: integer + format: int64 + nullable: true + parent_run_id: + type: string + nullable: true + + RunDetail: + allOf: + - $ref: '#/components/schemas/Run' + - type: object + properties: + workflow: + type: object + additionalProperties: true + description: Frozen workflow snapshot for this run. + input: + type: object + additionalProperties: true + callbacks: + type: object + additionalProperties: true + state: + type: object + additionalProperties: true + steps: + type: array + items: + $ref: '#/components/schemas/Step' + + RunCreate: + type: object + required: [steps] + properties: + steps: + type: array + minItems: 1 + items: + $ref: '#/components/schemas/StepDefinition' + strategy: + type: string + description: Name of a registered strategy (e.g. `sequential`, `parallel`). + idempotency_key: + type: string + minLength: 1 + input: + type: object + additionalProperties: true + callbacks: + type: object + additionalProperties: true + config: + type: object + additionalProperties: true + description: Run-scoped config (`tracker_url`, `tracker_api_token`, etc.). + reduce: + type: object + description: Optional reducer step appended after parallel strategy. + properties: + id: + type: string + default: __reduce + prompt_template: + type: string + + RunCreated: + type: object + required: [id, status, idempotent_replay] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + idempotent_replay: + type: boolean + + StepDefinition: + type: object + required: [id] + properties: + id: + type: string + description: Definition step id (unique within the workflow). + type: + allOf: + - $ref: '#/components/schemas/StepType' + default: task + depends_on: + type: array + items: + type: string + description: Definition step ids this step depends on. + retry: + type: object + properties: + max_attempts: + type: integer + minimum: 1 + default: 1 + timeout_ms: + type: integer + format: int64 + minimum: 0 + prompt_template: + type: string + worker: + type: string + description: Constrain dispatch to a specific worker id. + tags: + type: array + items: { type: string } + description: Worker-tag selector. + output_key: + type: string + output_mapping: + type: object + additionalProperties: true + items_key: + type: string + description: For `send` (fan-out) steps — state key holding the items array. + steps: + type: array + items: + $ref: '#/components/schemas/StepDefinition' + description: For `subgraph` or strategy-nested steps. + additionalProperties: true + + Step: + type: object + required: + - id + - run_id + - def_step_id + - type + - status + - attempt + - max_attempts + - created_at_ms + - updated_at_ms + - iteration_index + properties: + id: { type: string } + run_id: { type: string } + def_step_id: { type: string } + type: { $ref: '#/components/schemas/StepType' } + status: { $ref: '#/components/schemas/StepStatus' } + worker_id: + type: string + nullable: true + input: + type: object + additionalProperties: true + output: + type: object + additionalProperties: true + nullable: true + error_text: + type: string + nullable: true + attempt: { type: integer, format: int64, minimum: 1 } + max_attempts: { type: integer, format: int64, minimum: 1 } + timeout_ms: + type: integer + format: int64 + nullable: true + next_attempt_at_ms: + type: integer + format: int64 + nullable: true + parent_step_id: + type: string + nullable: true + item_index: + type: integer + format: int64 + nullable: true + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + started_at_ms: + type: integer + format: int64 + nullable: true + ended_at_ms: + type: integer + format: int64 + nullable: true + child_run_id: + type: string + nullable: true + iteration_index: { type: integer, format: int64 } + + Event: + type: object + required: [id, run_id, kind, ts_ms] + properties: + id: { type: integer, format: int64 } + run_id: { type: string } + step_id: + type: string + nullable: true + kind: + type: string + description: Event kind (e.g. `step.started`, `step.completed`, `run.completed`). + data: + type: object + additionalProperties: true + ts_ms: { type: integer, format: int64 } + + Checkpoint: + type: object + required: [id, run_id, step_id, version, created_at_ms] + properties: + id: { type: string } + run_id: { type: string } + step_id: { type: string } + parent_id: + type: string + nullable: true + state: + type: object + additionalProperties: true + completed_nodes: + type: array + items: + type: string + version: { type: integer, format: int64 } + metadata: + type: object + additionalProperties: true + nullable: true + created_at_ms: { type: integer, format: int64 } + + Workflow: + type: object + required: [id, name, definition, version, created_at_ms, updated_at_ms] + properties: + id: { type: string } + name: { type: string } + definition: + type: object + required: [steps] + properties: + steps: + type: array + items: + $ref: '#/components/schemas/StepDefinition' + strategy: + type: string + version: { type: integer, format: int64, minimum: 1 } + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + + WorkflowSummary: + type: object + required: [id, name, version, created_at_ms, updated_at_ms] + properties: + id: { type: string } + name: { type: string } + version: { type: integer, format: int64 } + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + + WorkflowCreate: + type: object + required: [id, name, definition] + properties: + id: + type: string + minLength: 1 + name: + type: string + minLength: 1 + definition: + type: object + required: [steps] + properties: + steps: + type: array + minItems: 1 + items: + $ref: '#/components/schemas/StepDefinition' + strategy: + type: string + + WorkflowUpdate: + type: object + required: [definition] + properties: + name: + type: string + definition: + type: object + required: [steps] + properties: + steps: + type: array + minItems: 1 + items: + $ref: '#/components/schemas/StepDefinition' + strategy: + type: string + + ValidationResult: + type: object + required: [valid, errors] + properties: + valid: + type: boolean + errors: + type: array + items: + type: object + required: [code, message] + properties: + code: + type: string + message: + type: string + path: + type: string + description: JSON pointer into the workflow definition. + + RateLimitEntry: + type: object + required: [worker_id, remaining, limit, reset_ms, updated_at_ms] + properties: + worker_id: { type: string } + remaining: { type: integer, format: int64, minimum: 0 } + limit: { type: integer, format: int64, minimum: 0 } + reset_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + + TrackerStatus: + type: object + required: [enabled] + properties: + enabled: { type: boolean } + url: + type: string + format: uri + nullable: true + last_poll_ms: + type: integer + format: int64 + nullable: true + active_leases: { type: integer, format: int64 } + message: + type: string + description: Human-readable status, especially when `enabled=false`. + + TrackerRun: + type: object + required: + - task_id + - tracker_run_id + - boiler_run_id + - lease_id + - pipeline_id + - agent_role + - task_title + - task_stage + - task_version + - artifact_kind + - state + - claimed_at_ms + properties: + task_id: { type: string } + tracker_run_id: { type: string } + boiler_run_id: { type: string } + lease_id: { type: string } + pipeline_id: { type: string } + agent_role: { type: string } + task_title: { type: string } + task_stage: { type: string } + task_version: { type: integer, format: int64 } + success_trigger: + type: string + nullable: true + artifact_kind: { type: string } + state: { $ref: '#/components/schemas/TrackerTaskState' } + claimed_at_ms: { type: integer, format: int64 } + last_heartbeat_ms: + type: integer + format: int64 + nullable: true + lease_expires_at_ms: + type: integer + format: int64 + nullable: true + completed_at_ms: + type: integer + format: int64 + nullable: true + last_error_text: + type: string + nullable: true + + AgentEventPayload: + type: object + required: [iteration, status] + properties: + iteration: + type: integer + format: int64 + minimum: 0 + tool: + type: string + nullable: true + args: + type: object + additionalProperties: true + nullable: true + result_text: + type: string + nullable: true + status: + type: string + enum: [tool_call, tool_result, model_step, error, completed] From 67969651824c510694e4ee326f7a5263eac5eb87 Mon Sep 17 00:00:00 2001 From: Igor Somov Date: Wed, 13 May 2026 09:54:10 -0300 Subject: [PATCH 2/2] fix(api-docs): align OpenAPI spec with current API --- CLAUDE.md | 17 +- docs/api/README.md | 11 +- docs/openapi.yaml | 646 +++++++++++++++++++++++++++------------------ src/api.zig | 35 ++- 4 files changed, 430 insertions(+), 279 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index ab961ee..3482438 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -15,7 +15,7 @@ Graph-based workflow orchestrator with unified state model for NullClaw AI bot a | File | Role | |------|------| | `main.zig` | CLI args (`--port`, `--db`, `--config`, `--version`, `--export-manifest`, `--from-json`), HTTP accept loop, engine thread, tracker thread | -| `api.zig` | REST API routing and 30+ endpoint handlers (runs, workers, workflows, checkpoints, state, SSE stream, tracker) | +| `api.zig` | REST API routing and 30+ endpoint handlers (runs, workers, workflows, checkpoints, state, stream snapshots, tracker) | | `store.zig` | SQLite layer, CRUD methods for all tables, schema migrations (4 migration files) | | `engine.zig` | Graph-based state scheduler: tick loop, 7 node type handlers, checkpoints, reducers, goto, breakpoints, deferred nodes, reconciliation | | `state.zig` | Unified state model: 7 reducer types (last_value, append, merge, add, min, max, add_messages), overwrite bypass, ephemeral keys, state path resolution | @@ -46,7 +46,7 @@ Graph-based workflow orchestrator with unified state model for NullClaw AI bot a ```sh zig build # build -zig build test # unit tests (320 tests) +zig build test # unit tests (355 tests) zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock workers) ./zig-out/bin/nullboiler --port 8080 --db nullboiler.db --config config.json ``` @@ -68,8 +68,8 @@ zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock wo | POST | `/workers` | Register worker | | GET | `/workers` | List workers | | DELETE | `/workers/{id}` | Remove worker | -| POST | `/runs` | Create workflow run (legacy step-array or graph format) | -| GET | `/runs` | List runs (supports ?status= filter) | +| POST | `/runs` | Create workflow run from legacy step-array format | +| GET | `/runs` | List runs (supports ?status= and ?workflow_id= filters) | | GET | `/runs/{id}` | Get run details | | POST | `/runs/{id}/cancel` | Cancel run | | POST | `/runs/{id}/retry` | Retry failed run | @@ -82,7 +82,7 @@ zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock wo | GET | `/runs/{id}/events` | List run events | | GET | `/runs/{id}/checkpoints` | List checkpoints for run | | GET | `/runs/{id}/checkpoints/{cpId}` | Get checkpoint details | -| GET | `/runs/{id}/stream` | SSE stream (supports ?mode=values\|updates\|tasks\|debug) | +| GET | `/runs/{id}/stream` | JSON stream snapshot (supports ?mode=values\|updates\|tasks\|debug\|custom and ?after_seq=) | | POST | `/workflows` | Create workflow definition | | GET | `/workflows` | List workflow definitions | | GET | `/workflows/{id}` | Get workflow definition | @@ -136,9 +136,12 @@ MQTT listener: (conditional, for async MQTT workers) Redis listener: (conditional, for async Redis workers) ``` -### SSE Streaming +### Stream Snapshots -5 modes for real-time consumption via `GET /runs/{id}/stream?mode=X`: +`GET /runs/{id}/stream` currently returns a JSON snapshot containing persisted +events and buffered in-memory stream events. It supports `mode=X` filtering and +`after_seq=N` cursors for independent consumers. The internal stream hub uses 5 +modes: - `values` -- full state after each step - `updates` -- node name + partial state updates - `tasks` -- task start/finish with metadata diff --git a/docs/api/README.md b/docs/api/README.md index 8287b13..39323fd 100644 --- a/docs/api/README.md +++ b/docs/api/README.md @@ -14,7 +14,7 @@ domain types from `src/types.zig`. |---|---| | Health, Metrics | `GET /health`, `GET /metrics` | | Runs | `POST /runs`, `GET /runs`, `GET /runs/{id}`, `POST /runs/{id}/{cancel,retry,resume,replay,state}`, `POST /runs/fork` | -| Steps & Events | `GET /runs/{id}/steps`, `GET /runs/{id}/steps/{step_id}`, `GET /runs/{id}/events`, `GET /runs/{id}/stream` (SSE) | +| Steps & Events | `GET /runs/{id}/steps`, `GET /runs/{id}/steps/{step_id}`, `GET /runs/{id}/events`, `GET /runs/{id}/stream` (JSON stream snapshot) | | Checkpoints | `GET /runs/{id}/checkpoints`, `GET /runs/{id}/checkpoints/{cp_id}` | | Workers | `POST /workers`, `GET /workers`, `DELETE /workers/{id}` | | Workflows | full CRUD on `/workflows`, plus `validate`, `mermaid`, `run` | @@ -95,8 +95,9 @@ spec version per release tag. {"error": {"code": "", "message": ""}} ``` See `ErrorDetail.code` for the closed enum of codes. -- **Idempotency** — `POST /runs` and `POST /workflows/{id}/run` honor - `Idempotency-Key` (preferred) or `idempotency_key` body field. +- **Idempotency** — `POST /runs` honors `Idempotency-Key` (preferred) or + `idempotency_key` body field. Stored-workflow launches via + `POST /workflows/{id}/run` do not currently implement idempotency. - **Auth** — bearer token; `/health` and `/metrics` are public so that load balancers and Prometheus scrapers can reach them without provisioning a token. @@ -125,8 +126,8 @@ branch: - `src/api.zig` — route table (`handleRequest`) and per-handler bodies - `src/types.zig` — all enums and DB row types - `src/strategy.zig` — strategy expansion semantics -- `src/workflow_loader.zig` — workflow JSON shape -- `src/workflow_validation.zig` — validation rules +- `src/workflow_validation.zig` and `src/engine.zig` — graph workflow shape + and validation rules - `src/metrics.zig` — Prometheus exposition (used in `/metrics` example) If you change one of those files, update this spec. CI does not yet diff --git a/docs/openapi.yaml b/docs/openapi.yaml index b96a463..75a9b3e 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -22,16 +22,11 @@ info: ## Endpoint availability - A few endpoints are conditional on configuration or build version: + A few endpoints are conditional on runtime configuration: - **`/tracker/*`** — only mounted when `tracker` is configured in `nullboiler.config.json`. Otherwise the orchestrator runs in tracker-less mode and these paths return `404`. - - **`/workflows/*`** and **`/rate-limits`** — present on `main`. - The published `ghcr.io/nullclaw/nullboiler:2026.3.2` image (built - from an earlier commit) returns `404` for them. They are - documented here because consumers targeting `main` or a future - release will see them. ## Authentication @@ -81,7 +76,7 @@ tags: - name: Steps description: Individual step inspection within a run. - name: Events - description: Run-scoped event feed (polling and SSE streaming). + description: Run-scoped event feed and stream snapshots. - name: Checkpoints description: Run state snapshots used for resume, fork, and replay. - name: Workers @@ -127,8 +122,7 @@ paths: summary: Prometheus metrics description: | Prometheus exposition format. Public (no auth) so that scrapers can - reach it without provisioning a token. See `dashboards/grafana/` - for ready-to-import dashboards. + reach it without provisioning a token. operationId: getMetrics security: [] responses: @@ -430,9 +424,11 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Run' + $ref: '#/components/schemas/RunStatusResponse' '404': $ref: '#/components/responses/NotFound' + '409': + $ref: '#/components/responses/Conflict' '500': $ref: '#/components/responses/InternalError' @@ -440,19 +436,21 @@ paths: post: tags: [Runs] summary: Retry a failed run - description: Creates a new run from the workflow snapshot of the failed run. + description: Resets failed steps to ready and moves the run back to `running`. operationId: retryRun parameters: - $ref: '#/components/parameters/RunId' responses: - '201': - description: Retry created + '200': + description: Run moved back to running content: application/json: schema: - $ref: '#/components/schemas/RunCreated' + $ref: '#/components/schemas/RunStatusResponse' '404': $ref: '#/components/responses/NotFound' + '409': + $ref: '#/components/responses/Conflict' '500': $ref: '#/components/responses/InternalError' @@ -498,18 +496,10 @@ paths: tags: [Events] summary: List run events description: | - Polling-mode event feed. For continuous tailing, use - `GET /runs/{id}/stream` (SSE). + Polling-mode event feed backed by persisted run events. operationId: listRunEvents parameters: - $ref: '#/components/parameters/RunId' - - name: since - in: query - required: false - schema: - type: integer - format: int64 - description: Only return events with `id` greater than this value. responses: '200': description: Array of events ordered by id ascending @@ -519,30 +509,48 @@ paths: type: array items: $ref: '#/components/schemas/Event' - '404': - $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' /runs/{id}/stream: get: tags: [Events] - summary: Stream run events via SSE + summary: Snapshot run stream events description: | - Server-Sent Events stream of run events. Clients should reconnect - with `Last-Event-ID` to resume after disconnects. Stream ends when - the run reaches a terminal status (completed, failed, cancelled). + Returns the current run status/state, persisted events, and buffered + in-memory stream events as a regular JSON response. Use `after_seq` + to page through buffered stream events without sharing cursor state + between consumers. operationId: streamRunEvents parameters: - $ref: '#/components/parameters/RunId' + - name: mode + in: query + required: false + schema: + type: string + example: values,debug + description: Comma-separated stream modes to include (`values`, `updates`, `tasks`, `debug`, `custom`). Defaults to all modes. + - name: after_seq + in: query + required: false + schema: + type: integer + format: int64 + minimum: 0 + default: 0 + description: Return buffered stream events after this sequence number. responses: '200': - description: SSE event stream + description: Stream snapshot content: - text/event-stream: + application/json: schema: - type: string - description: | - SSE format: each event has `id:`, `event:`, `data:` lines. - `data` is a JSON object with the same shape as `Event`. + $ref: '#/components/schemas/StreamSnapshot' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' /runs/{id}/checkpoints: get: @@ -584,36 +592,38 @@ paths: /runs/{id}/resume: post: tags: [Runs] - summary: Resume run from a checkpoint + summary: Resume an interrupted run + description: | + Resumes the latest checkpoint for a run currently in `interrupted` + status. Optional `state_updates` are applied before the run is moved + back to `running`. operationId: resumeRun parameters: - $ref: '#/components/parameters/RunId' requestBody: - required: true + required: false content: application/json: schema: type: object - required: [checkpoint_id] properties: - checkpoint_id: - type: string - description: Checkpoint to resume from. - state_overrides: + state_updates: type: object additionalProperties: true - description: Optional state values to inject before resuming. + description: Optional state values to merge before resuming. responses: '200': description: Run resumed content: application/json: schema: - $ref: '#/components/schemas/Run' - '400': - $ref: '#/components/responses/BadRequest' + $ref: '#/components/schemas/RunStatusResponse' '404': $ref: '#/components/responses/NotFound' + '409': + $ref: '#/components/responses/Conflict' + '500': + $ref: '#/components/responses/InternalError' /runs/fork: post: @@ -629,12 +639,11 @@ paths: application/json: schema: type: object - required: [parent_run_id, checkpoint_id] + required: [checkpoint_id] properties: - parent_run_id: - type: string checkpoint_id: type: string + description: Existing checkpoint to fork from; the parent run is inferred from this checkpoint. state_overrides: type: object additionalProperties: true @@ -644,11 +653,13 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/RunCreated' + $ref: '#/components/schemas/ForkRunResponse' '400': $ref: '#/components/responses/BadRequest' '404': $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' /runs/{id}/state: post: @@ -676,49 +687,58 @@ paths: type: string description: Optional def_step_id — defer until that step finishes. responses: - '202': - description: Injection queued + '200': + description: State update applied immediately or queued for a later step content: application/json: schema: - type: object - properties: - queued: { type: boolean, enum: [true] } - injection_id: { type: integer, format: int64 } + $ref: '#/components/schemas/StateInjectionResponse' '400': $ref: '#/components/responses/BadRequest' '404': $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' /runs/{id}/replay: post: tags: [Runs] - summary: Replay a completed run + summary: Replay a run from a checkpoint description: | - Re-executes a run deterministically from its frozen workflow - snapshot. Useful for debugging non-determinism or regression checks. + Resets the existing run back to a checkpoint, deletes later steps and + checkpoints, and moves the same run back to `running`. operationId: replayRun parameters: - $ref: '#/components/parameters/RunId' requestBody: - required: false + required: true content: application/json: schema: type: object + anyOf: + - required: [from_checkpoint_id] + - required: [checkpoint_id] properties: - from_step: + from_checkpoint_id: type: string - description: Optional def_step_id to start replay from. + description: Canonical checkpoint id to replay from. + checkpoint_id: + type: string + description: Backward-compatible alias for `from_checkpoint_id`. responses: - '201': - description: Replay run created + '200': + description: Run reset to checkpoint and moved to running content: application/json: schema: - $ref: '#/components/schemas/RunCreated' + $ref: '#/components/schemas/ReplayRunResponse' + '400': + $ref: '#/components/responses/BadRequest' '404': $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' /workflows: post: @@ -737,28 +757,24 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Workflow' + $ref: '#/components/schemas/WorkflowCreated' '400': $ref: '#/components/responses/BadRequest' - '409': - description: Workflow id already exists - content: - application/json: - schema: - $ref: '#/components/schemas/ErrorResponse' + '500': + $ref: '#/components/responses/InternalError' get: tags: [Workflows] summary: List stored workflows operationId: listWorkflows responses: '200': - description: Array of workflows (definitions omitted in summary form) + description: Array of workflows including definitions content: application/json: schema: type: array items: - $ref: '#/components/schemas/WorkflowSummary' + $ref: '#/components/schemas/Workflow' /workflows/{id}: get: @@ -780,9 +796,9 @@ paths: tags: [Workflows] summary: Update workflow definition description: | - Replaces the workflow definition. Increments `version`. Existing - runs already created from older versions are unaffected (they hold - a frozen snapshot of the workflow). + Replaces the workflow definition. If `version` is omitted, the stored + version is preserved. Existing runs already created from older versions + are unaffected because they hold a frozen snapshot of the workflow. operationId: updateWorkflow parameters: - $ref: '#/components/parameters/WorkflowId' @@ -798,7 +814,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Workflow' + $ref: '#/components/schemas/OkResponse' '400': $ref: '#/components/responses/BadRequest' '404': @@ -815,9 +831,7 @@ paths: content: application/json: schema: - type: object - properties: - ok: { type: boolean, enum: [true] } + $ref: '#/components/schemas/OkResponse' '404': $ref: '#/components/responses/NotFound' @@ -826,8 +840,9 @@ paths: tags: [Workflows] summary: Validate a stored workflow definition description: | - Runs the same validation as `POST /runs` but without creating a - run. Returns a list of validation errors (empty array on success). + Runs graph-based workflow validation without creating a run. Returns + a list of validation errors (empty array on success) and, when + generation succeeds, a Mermaid diagram. operationId: validateWorkflow parameters: - $ref: '#/components/parameters/WorkflowId' @@ -840,6 +855,8 @@ paths: $ref: '#/components/schemas/ValidationResult' '404': $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' /workflows/{id}/mermaid: get: @@ -869,16 +886,12 @@ paths: tags: [Workflows] summary: Launch a run from a stored workflow description: | - Convenience over `POST /runs` when the workflow definition is - already stored. Idempotency works the same way. + Convenience over `POST /runs` when the graph workflow definition is + already stored. The current implementation accepts an optional + `input` object and does not implement idempotency for this endpoint. operationId: runWorkflow parameters: - $ref: '#/components/parameters/WorkflowId' - - name: Idempotency-Key - in: header - required: false - schema: - type: string requestBody: required: false content: @@ -889,30 +902,19 @@ paths: input: type: object additionalProperties: true - callbacks: - type: object - additionalProperties: true - idempotency_key: - type: string responses: '201': description: Run created content: application/json: schema: - $ref: '#/components/schemas/RunCreated' - '200': - description: Idempotent replay - content: - application/json: - schema: - $ref: '#/components/schemas/RunCreated' + $ref: '#/components/schemas/RunStatusResponse' '400': $ref: '#/components/responses/BadRequest' '404': $ref: '#/components/responses/NotFound' - '503': - description: Drain mode + '500': + $ref: '#/components/responses/InternalError' /tracker/status: get: @@ -920,8 +922,8 @@ paths: summary: NullTickets bridge status description: | Reports whether NullBoiler is connected to a NullTickets tracker - and basic queue health. Returns a degraded-but-200 status when - no tracker is configured. + and basic queue health. Returns `404 tracker_disabled` when no + tracker is configured. operationId: getTrackerStatus responses: '200': @@ -930,6 +932,10 @@ paths: application/json: schema: $ref: '#/components/schemas/TrackerStatus' + '404': + $ref: '#/components/responses/TrackerDisabled' + '500': + $ref: '#/components/responses/InternalError' /tracker/tasks: get: @@ -944,7 +950,11 @@ paths: schema: type: array items: - $ref: '#/components/schemas/TrackerRun' + $ref: '#/components/schemas/TrackerTask' + '404': + $ref: '#/components/responses/TrackerDisabled' + '500': + $ref: '#/components/responses/InternalError' /tracker/tasks/{task_id}: get: @@ -963,9 +973,11 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/TrackerRun' + $ref: '#/components/schemas/TrackerTask' '404': $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' /tracker/stats: get: @@ -978,34 +990,27 @@ paths: content: application/json: schema: - type: object - additionalProperties: - type: integer - description: Map of `TrackerTaskState` to count. - example: - running: 4 - completed: 192 - failed: 7 - stalled: 1 + $ref: '#/components/schemas/TrackerStats' + '404': + $ref: '#/components/responses/TrackerDisabled' + '500': + $ref: '#/components/responses/InternalError' /tracker/refresh: post: tags: [Tracker] summary: Force a tracker poll cycle description: | - Asks the tracker bridge to immediately claim and dispatch tasks - instead of waiting for the next poll tick. Idempotent — if a poll - is already in-flight, the request is dropped. + Acknowledges an operator refresh request. The current handler returns + success immediately. operationId: refreshTracker responses: - '202': - description: Refresh queued + '200': + description: Refresh request acknowledged content: application/json: schema: - type: object - properties: - queued: { type: boolean, enum: [true] } + $ref: '#/components/schemas/TrackerRefreshResponse' /internal/agent-events/{run_id}/{step_id}: post: @@ -1119,6 +1124,30 @@ components: error: code: not_found message: run not found + Conflict: + description: Request conflicts with current resource state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + conflict: + value: + error: + code: conflict + message: run is not failed + TrackerDisabled: + description: Pull-mode tracker is not configured + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + disabled: + value: + error: + code: tracker_disabled + message: pull-mode tracker is not configured InternalError: description: Internal server error content: @@ -1172,6 +1201,7 @@ components: - internal - draining - unauthorized + - tracker_disabled message: type: string @@ -1341,57 +1371,46 @@ components: True when at least one more page exists. Computed by fetching `limit + 1` rows and checking the overflow. - Run: + RunDetail: type: object - required: [id, status, workflow_id, created_at_ms, updated_at_ms] + required: + - id + - status + - created_at_ms + - updated_at_ms + - checkpoint_count + - total_input_tokens + - total_output_tokens + - total_tokens + - steps properties: id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } idempotency_key: type: string - nullable: true - status: { $ref: '#/components/schemas/RunStatus' } + description: Present only when the run was created with an idempotency key. workflow_id: type: string - nullable: true + description: Present only when the run was launched from a stored workflow. error_text: type: string - nullable: true + description: Present only when the run has an error. + started_at_ms: { type: integer, format: int64 } + ended_at_ms: { type: integer, format: int64 } + state_json: + type: object + additionalProperties: true + description: Present only when persisted state exists for the run. + checkpoint_count: { type: integer, format: int64, minimum: 0 } + total_input_tokens: { type: integer, format: int64, minimum: 0 } + total_output_tokens: { type: integer, format: int64, minimum: 0 } + total_tokens: { type: integer, format: int64, minimum: 0 } created_at_ms: { type: integer, format: int64 } updated_at_ms: { type: integer, format: int64 } - started_at_ms: - type: integer - format: int64 - nullable: true - ended_at_ms: - type: integer - format: int64 - nullable: true - parent_run_id: - type: string - nullable: true - - RunDetail: - allOf: - - $ref: '#/components/schemas/Run' - - type: object - properties: - workflow: - type: object - additionalProperties: true - description: Frozen workflow snapshot for this run. - input: - type: object - additionalProperties: true - callbacks: - type: object - additionalProperties: true - state: - type: object - additionalProperties: true - steps: - type: array - items: - $ref: '#/components/schemas/Step' + steps: + type: array + items: + $ref: '#/components/schemas/Step' RunCreate: type: object @@ -1437,6 +1456,40 @@ components: idempotent_replay: type: boolean + RunStatusResponse: + type: object + required: [id, status] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + + ForkRunResponse: + type: object + required: [id, status, forked_from_checkpoint] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + forked_from_checkpoint: { type: string } + + ReplayRunResponse: + type: object + required: [id, status, replayed_from_checkpoint] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + replayed_from_checkpoint: { type: string } + + StateInjectionResponse: + type: object + required: [applied] + properties: + applied: + type: boolean + description: True when updates were applied immediately. + pending: + type: boolean + description: Present and true when updates were queued until `apply_after_step`. + StepDefinition: type: object required: [id] @@ -1463,7 +1516,7 @@ components: timeout_ms: type: integer format: int64 - minimum: 0 + minimum: 1 prompt_template: type: string worker: @@ -1510,13 +1563,11 @@ components: worker_id: type: string nullable: true - input: - type: object - additionalProperties: true - output: + output_json: type: object additionalProperties: true nullable: true + description: Present only after a step has output. error_text: type: string nullable: true @@ -1569,6 +1620,52 @@ components: additionalProperties: true ts_ms: { type: integer, format: int64 } + StreamEvent: + type: object + required: [seq, event, mode, data] + properties: + seq: { type: integer, format: int64, minimum: 1 } + event: { type: string } + mode: + type: string + enum: [values, updates, tasks, debug, custom] + data: + type: object + additionalProperties: true + + StreamPersistedEvent: + type: object + required: [kind, data, ts_ms] + properties: + kind: { type: string } + data: + type: object + additionalProperties: true + ts_ms: { type: integer, format: int64 } + + StreamSnapshot: + type: object + required: [status, events, stream_events, next_stream_seq, stream_oldest_seq, stream_gap] + properties: + status: { $ref: '#/components/schemas/RunStatus' } + state: + type: object + additionalProperties: true + description: Present only when persisted state exists for the run. + events: + type: array + items: + $ref: '#/components/schemas/StreamPersistedEvent' + stream_events: + type: array + items: + $ref: '#/components/schemas/StreamEvent' + next_stream_seq: { type: integer, format: int64, minimum: 0 } + stream_oldest_seq: { type: integer, format: int64, minimum: 0 } + stream_gap: + type: boolean + description: True when requested events are older than the retained buffer. + Checkpoint: type: object required: [id, run_id, step_id, version, created_at_ms] @@ -1600,68 +1697,91 @@ components: id: { type: string } name: { type: string } definition: - type: object - required: [steps] - properties: - steps: - type: array - items: - $ref: '#/components/schemas/StepDefinition' - strategy: - type: string + $ref: '#/components/schemas/WorkflowDefinition' version: { type: integer, format: int64, minimum: 1 } created_at_ms: { type: integer, format: int64 } updated_at_ms: { type: integer, format: int64 } - WorkflowSummary: + WorkflowCreated: type: object - required: [id, name, version, created_at_ms, updated_at_ms] + required: [id, name, version] properties: id: { type: string } name: { type: string } version: { type: integer, format: int64 } - created_at_ms: { type: integer, format: int64 } - updated_at_ms: { type: integer, format: int64 } + + OkResponse: + type: object + required: [ok] + properties: + ok: { type: boolean, enum: [true] } + + WorkflowDefinition: + type: object + description: Graph workflow definition consumed by the engine. + required: [nodes, edges] + properties: + nodes: + type: object + additionalProperties: + type: object + additionalProperties: true + description: Map of node name to node configuration. + edges: + type: array + items: + type: array + minItems: 2 + maxItems: 2 + items: + type: string + description: Directed edges such as `["__start__", "node"]` or `["router:yes", "next"]`. + state_schema: + type: object + additionalProperties: + type: object + additionalProperties: true + interrupt_before: + type: array + items: { type: string } + interrupt_after: + type: array + items: { type: string } + additionalProperties: true WorkflowCreate: type: object - required: [id, name, definition] properties: id: type: string minLength: 1 + description: Optional. Generated when omitted. name: type: string - minLength: 1 - definition: - type: object - required: [steps] - properties: - steps: - type: array - minItems: 1 - items: - $ref: '#/components/schemas/StepDefinition' - strategy: - type: string + default: untitled + version: + type: integer + format: int64 + minimum: 1 + default: 1 + definition_json: + $ref: '#/components/schemas/WorkflowDefinition' + additionalProperties: true WorkflowUpdate: type: object - required: [definition] properties: name: type: string - definition: - type: object - required: [steps] - properties: - steps: - type: array - minItems: 1 - items: - $ref: '#/components/schemas/StepDefinition' - strategy: - type: string + default: untitled + version: + type: integer + format: int64 + minimum: 1 + description: Optional explicit version. When omitted, the existing version is preserved. + definition_json: + $ref: '#/components/schemas/WorkflowDefinition' + additionalProperties: true ValidationResult: type: object @@ -1673,15 +1793,19 @@ components: type: array items: type: object - required: [code, message] + required: [type, message] properties: - code: + type: type: string - message: + node: + type: string + key: type: string - path: + message: type: string - description: JSON pointer into the workflow definition. + mermaid: + type: string + description: Present when Mermaid generation succeeds. RateLimitEntry: type: object @@ -1695,72 +1819,70 @@ components: TrackerStatus: type: object - required: [enabled] + required: [mode, running_count, max_concurrent, completed_count, failed_count, poll_interval_ms, running] properties: - enabled: { type: boolean } - url: + mode: type: string - format: uri - nullable: true - last_poll_ms: - type: integer - format: int64 - nullable: true - active_leases: { type: integer, format: int64 } - message: - type: string - description: Human-readable status, especially when `enabled=false`. + enum: [pull] + running_count: { type: integer, format: int64, minimum: 0 } + max_concurrent: { type: integer, format: int64, minimum: 1 } + completed_count: { type: integer, format: int64, minimum: 0 } + failed_count: { type: integer, format: int64, minimum: 0 } + poll_interval_ms: { type: integer, format: int64, minimum: 1 } + running: + type: array + items: + $ref: '#/components/schemas/TrackerTask' - TrackerRun: + TrackerTask: type: object required: - task_id - - tracker_run_id - - boiler_run_id - - lease_id + - task_title - pipeline_id - agent_role - - task_title - - task_stage - - task_version - - artifact_kind + - execution + - current_turn + - max_turns + - started_at_ms + - last_activity_ms - state - - claimed_at_ms properties: task_id: { type: string } - tracker_run_id: { type: string } - boiler_run_id: { type: string } - lease_id: { type: string } + task_title: { type: string } pipeline_id: { type: string } agent_role: { type: string } - task_title: { type: string } - task_stage: { type: string } - task_version: { type: integer, format: int64 } - success_trigger: + execution: type: string - nullable: true - artifact_kind: { type: string } + enum: [subprocess, dispatch] + current_turn: { type: integer, format: int64, minimum: 0 } + max_turns: { type: integer, format: int64, minimum: 1 } + started_at_ms: { type: integer, format: int64 } + last_activity_ms: { type: integer, format: int64 } state: { $ref: '#/components/schemas/TrackerTaskState' } - claimed_at_ms: { type: integer, format: int64 } - last_heartbeat_ms: - type: integer - format: int64 - nullable: true - lease_expires_at_ms: - type: integer - format: int64 - nullable: true - completed_at_ms: - type: integer - format: int64 - nullable: true - last_error_text: + + TrackerStats: + type: object + required: [running, completed, failed, total, max_concurrent] + properties: + running: { type: integer, format: int64, minimum: 0 } + completed: { type: integer, format: int64, minimum: 0 } + failed: { type: integer, format: int64, minimum: 0 } + total: { type: integer, format: int64, minimum: 0 } + max_concurrent: { type: integer, format: int64, minimum: 1 } + + TrackerRefreshResponse: + type: object + required: [status, message] + properties: + status: + type: string + enum: [ok] + message: type: string - nullable: true AgentEventPayload: type: object - required: [iteration, status] properties: iteration: type: integer @@ -1773,9 +1895,9 @@ components: type: object additionalProperties: true nullable: true - result_text: + result: type: string nullable: true status: type: string - enum: [tool_call, tool_result, model_step, error, completed] + default: running diff --git a/src/api.zig b/src/api.zig index 3ddc132..b5ee09e 100644 --- a/src/api.zig +++ b/src/api.zig @@ -232,7 +232,7 @@ pub fn handleRequest(ctx: *Context, method: []const u8, target: []const u8, body return handleInjectState(ctx, seg1.?, body); } - // ── SSE stream endpoint ───────────────────────────────────────── + // ── Stream snapshot endpoint ──────────────────────────────────── // GET /runs/{id}/stream if (is_get and eql(seg0, "runs") and seg1 != null and eql(seg2, "stream") and seg3 == null) { @@ -1506,9 +1506,10 @@ fn handleForkRun(ctx: *Context, body: []const u8) HttpResponse { }; const run_id_json = jsonQuoted(ctx.allocator, new_run_id) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); + const checkpoint_id_json = jsonQuoted(ctx.allocator, checkpoint_id) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); const resp = std.fmt.allocPrint(ctx.allocator, \\{{"id":{s},"status":"running","forked_from_checkpoint":{s}}} - , .{ run_id_json, checkpoint_id }) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); + , .{ run_id_json, checkpoint_id_json }) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); return jsonResponse(201, resp); } @@ -1630,12 +1631,12 @@ fn handleInjectState(ctx: *Context, run_id: []const u8, body: []const u8) HttpRe } } -// ── SSE Stream Handler ────────────────────────────────────────────── +// ── Stream Snapshot Handler ───────────────────────────────────────── fn handleStream(ctx: *Context, run_id: []const u8, target: []const u8) HttpResponse { // For now, return the current state and events as a regular JSON response. - // Full SSE streaming with held-open connections will be implemented - // when the threading model is wired in main.zig (Task 12). + // Held-open SSE is handled internally by the hub; the HTTP endpoint + // returns snapshots so independent consumers can use after_seq cursors. // // Supports ?mode=values,tasks,debug,updates,custom query param to filter // which streaming modes the client wants. Default: all modes. @@ -2667,6 +2668,30 @@ test "API: list runs supports workflow_id filter" { try std.testing.expect(std.mem.indexOf(u8, resp.body, "\"workflow_id\":\"wf_2\"") == null); } +test "API: fork run returns quoted checkpoint id" { + const allocator = std.testing.allocator; + var store = try Store.init(allocator, ":memory:"); + defer store.deinit(); + + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + + try store.createRunWithState("r1", null, "{\"nodes\":{}}", "{}", "{\"x\":1}"); + try store.createCheckpoint("cp-one", "r1", "step_a", null, "{\"x\":1}", "[\"step_a\"]", 1, null); + + var ctx = Context{ + .store = &store, + .allocator = arena.allocator(), + }; + + const resp = handleRequest(&ctx, "POST", "/runs/fork", "{\"checkpoint_id\":\"cp-one\"}"); + try std.testing.expectEqual(@as(u16, 201), resp.status_code); + + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, resp.body, .{}); + defer parsed.deinit(); + try std.testing.expectEqualStrings("cp-one", parsed.value.object.get("forked_from_checkpoint").?.string); +} + test "API: replay run from checkpoint" { const allocator = std.testing.allocator; var store = try Store.init(allocator, ":memory:");