diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..11894db --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,6 @@ +# Documentation + +Project documentation has moved to [README.md](README.md) and `docs/`. + +- Read first: `README.md` +- Design docs: `docs/plans/` diff --git a/HACKATHON_SUBMISSION.md b/HACKATHON_SUBMISSION.md new file mode 100644 index 0000000..7c64695 --- /dev/null +++ b/HACKATHON_SUBMISSION.md @@ -0,0 +1,98 @@ +# Agent Flight Recorder + +## Problem Discovered + +NullWatch already provides the observability layer for the nullclaw ecosystem: +run summaries, spans, evals, OTLP ingest, cost, token usage, and failure context. +It also exports a NullHub-compatible manifest. NullHub already provides the +operator UI and orchestration pages, but it did not register NullWatch or expose +its tracing/eval data in the UI. + +## Chosen Solution + +Add a local-first Observability cockpit to NullHub: + +- register `nullwatch` as a known component +- proxy `/api/observability/*` to a managed NullWatch instance +- add a Flight Recorder page for runs, spans, evals, cost, tokens, and errors +- document the local demo flow through NullHub's managed install path + +## Why This Idea Was Chosen + +This is stronger than a single CLI preflight because it connects multiple parts +of the ecosystem into a visible agent platform story: execution, orchestration, +task tracking, observability, and operations. It is still hackathon-sized because +it uses existing NullWatch APIs and NullHub UI patterns instead of changing core +agent runtime behavior. + +## What Was Implemented + +- NullWatch component registration in the NullHub registry. +- Observability reverse proxy with optional bearer token forwarding. +- Sidebar entry and `/observability` UI page. +- API client methods for NullWatch summary, runs, spans, evals, and health. +- README documentation for the proxy and local demo setup. + +## Files Changed + +- `src/installer/registry.zig` +- `src/api/observability.zig` +- `src/api/proxy.zig` +- `src/api/components.zig` +- `src/api/meta.zig` +- `src/root.zig` +- `src/server.zig` +- `ui/src/lib/api/client.ts` +- `ui/src/lib/components/Sidebar.svelte` +- `ui/src/routes/observability/+page.svelte` +- `README.md` +- `HACKATHON_SUBMISSION.md` + +## How To Test Or Demo + +Start NullHub: + +```bash +zig build run -- serve --no-open +``` + +Install NullWatch from NullHub: + +1. Open the web UI. +2. Go to `Install Component`. +3. Select `NullWatch`. +4. Keep or set the API port to `7710`. +5. Finish the wizard. The installer starts the NullWatch instance and NullHub + discovers it automatically. + +Optional sample data can be ingested through the NullHub proxy: + +```bash +curl -X POST http://127.0.0.1:19800/api/observability/v1/spans \ + -H 'Content-Type: application/json' \ + -d '{"run_id":"demo-run-1","trace_id":"trace-demo-1","span_id":"span-1","source":"nullclaw","operation":"tool.call","status":"error","started_at_ms":1710000000000,"ended_at_ms":1710000001500,"tool_name":"shell","error_message":"tool call failed: command timed out","attributes_json":"{\"exit_code\":124}"}' + +curl -X POST http://127.0.0.1:19800/api/observability/v1/evals \ + -H 'Content-Type: application/json' \ + -d '{"run_id":"demo-run-1","eval_key":"tool_success","scorer":"deterministic","score":0.0,"verdict":"fail","dataset":"demo","notes":"The tool call timed out."}' +``` + +Open `/observability` in NullHub and inspect the NullWatch runs. + +## Screenshots + +Flight Recorder overview: + +![NullHub Observability overview](docs/screenshots/nullhub-observability-overview.png) + +Failure detail with tool-call error context: + +![NullHub Observability failure detail](docs/screenshots/nullhub-observability-failure.png) + +## Limitations And Future Improvements + +- `NULLWATCH_URL` remains useful for pointing NullHub at an external NullWatch + instance, but the default demo path uses a managed NullWatch install. +- The first UI version renders a compact timeline, not a full waterfall chart. +- Run correlation with NullBoiler orchestration pages can be added as a follow-up + when both systems share stable run ids. diff --git a/README.md b/README.md index e2b7253..773d3cb 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Management hub for the nullclaw ecosystem. `NullHub` is a single Zig binary with an embedded Svelte web UI for installing, configuring, monitoring, and updating ecosystem components (NullClaw, NullBoiler, -NullTickets). +NullTickets, NullWatch). ## Features @@ -15,13 +15,14 @@ NullTickets). - **Process supervision** -- start, stop, restart, crash recovery with backoff - **Health monitoring** -- periodic HTTP health checks, dashboard status cards - **Cross-component linking** -- auto-connect `NullTickets -> NullBoiler`, generate native tracker config, and inspect queue/orchestrator status from one UI -- **Config management** -- structured editors for `NullClaw`, `NullBoiler`, and `NullTickets`, with raw JSON fallback when needed +- **Config management** -- structured editors for `NullClaw`, `NullBoiler`, `NullTickets`, and `NullWatch`, with raw JSON fallback when needed - **Log viewing** -- tail and live SSE streaming per instance - **One-click updates** -- download, migrate config, rollback on failure - **Multi-instance** -- run multiple instances of the same component side by side - **Web UI + CLI** -- browser dashboard for humans, CLI for automation - **Managed instance admin API** -- instance-scoped status, config, models, cron, channels, and skills routes for managed NullClaw installs - **Orchestration UI** -- workflow editor, poll-based run monitoring, checkpoint forking, encoded workflow/run/store links, and key-value store browser (proxied to NullTickets through NullHub) +- **Observability cockpit** -- local NullWatch run summaries, span timelines, eval results, token usage, cost, and error context through a NullHub proxy ## Quick Start @@ -119,6 +120,47 @@ to the local orchestration stack. Most routes go to NullBoiler's REST API via `/api/orchestration/store/*` is proxied to NullTickets via `NULLTICKETS_URL` and optional `NULLTICKETS_TOKEN`. +**Observability proxy** -- requests to `/api/observability/*` are reverse-proxied +to the managed NullWatch instance installed in NullHub. `NULLWATCH_URL` can +still override the target for an external NullWatch instance, and +`NULLWATCH_TOKEN` overrides the managed instance token when set. The built-in +Observability page uses this proxy to display run summaries, spans, evals, +latency, cost, and failure context without sending data to hosted services. + +Local NullWatch setup: + +1. Start NullHub: + + ```bash + zig build run -- serve --no-open + ``` + +2. In the web UI, open **Install Component**, select **NullWatch**, keep or set + the API port to `7710`, and finish the wizard. The installer starts the + NullWatch instance and the observability proxy discovers it automatically. + +3. Optional demo data can be ingested through the NullHub proxy: + + ```bash + curl -X POST http://127.0.0.1:19800/api/observability/v1/spans \ + -H 'Content-Type: application/json' \ + -d '{"run_id":"demo-run-1","trace_id":"trace-demo-1","span_id":"span-1","source":"nullclaw","operation":"tool.call","status":"error","started_at_ms":1710000000000,"ended_at_ms":1710000001500,"tool_name":"shell","error_message":"tool call failed: command timed out","attributes_json":"{\"exit_code\":124}"}' + + curl -X POST http://127.0.0.1:19800/api/observability/v1/evals \ + -H 'Content-Type: application/json' \ + -d '{"run_id":"demo-run-1","eval_key":"tool_success","scorer":"deterministic","score":0.0,"verdict":"fail","dataset":"demo","notes":"The tool call timed out."}' + ``` + +### Observability Screenshots + +Flight Recorder overview: + +![NullHub Observability overview](docs/screenshots/nullhub-observability-overview.png) + +Failure detail with tool-call error context: + +![NullHub Observability failure detail](docs/screenshots/nullhub-observability-failure.png) + ## Development Testing strategy and roadmap live in [TESTING.md](TESTING.md). @@ -159,12 +201,14 @@ src/ auth.zig # Optional bearer token auth api/ # REST endpoints (components, instances, wizard, ...) orchestration.zig # Reverse proxy to NullBoiler orchestration API + observability.zig # Reverse proxy to NullWatch tracing/eval API core/ # Manifest parser, state, platform, paths installer/ # Download, build, UI module fetching supervisor/ # Process spawn, health checks, manager ui/src/ routes/ # SvelteKit pages orchestration/ # Orchestration pages (dashboard, workflows, runs, store) + observability/ # NullWatch flight recorder page lib/components/ # Reusable Svelte components orchestration/ # GraphViewer, StateInspector, RunEventLog, InterruptPanel, # CheckpointTimeline, WorkflowJsonEditor, NodeCard, SendProgressBar diff --git a/docs/screenshots/nullhub-observability-failure.png b/docs/screenshots/nullhub-observability-failure.png new file mode 100644 index 0000000..19f38d4 Binary files /dev/null and b/docs/screenshots/nullhub-observability-failure.png differ diff --git a/docs/screenshots/nullhub-observability-overview.png b/docs/screenshots/nullhub-observability-overview.png new file mode 100644 index 0000000..31058e0 Binary files /dev/null and b/docs/screenshots/nullhub-observability-overview.png differ diff --git a/docs/superpowers/specs/2026-03-18-report-command-design.md b/docs/superpowers/specs/2026-03-18-report-command-design.md index 39c1273..3db7881 100644 --- a/docs/superpowers/specs/2026-03-18-report-command-design.md +++ b/docs/superpowers/specs/2026-03-18-report-command-design.md @@ -14,7 +14,7 @@ Create GitHub issues with pre-filled system data from CLI and Web UI. Order is fixed (as listed above) in both CLI and Web UI selectors. -The target list is hardcoded in `report.zig`, independent of `known_components` in `registry.zig`. nullwatch exists as a repo but is not yet in the component registry. +The target list is hardcoded in `report.zig`, independent of `known_components` in `registry.zig`. `nullwatch` is also a known installable component, so report target metadata should stay aligned with the registry entry. ## Report types and labels diff --git a/src/api/components.zig b/src/api/components.zig index f932cf1..048bd83 100644 --- a/src/api/components.zig +++ b/src/api/components.zig @@ -1,5 +1,6 @@ const std = @import("std"); const std_compat = @import("compat"); +const builtin = @import("builtin"); const registry = @import("../installer/registry.zig"); const paths_mod = @import("../core/paths.zig"); const state_mod = @import("../core/state.zig"); @@ -21,7 +22,12 @@ pub fn deriveDisplayName(allocator: std.mem.Allocator, name: []const u8) ![]cons /// Check if a component has a standalone installation at ~/.{component}/config.json fn hasStandaloneInstall(allocator: std.mem.Allocator, component: []const u8) bool { - const home = std_compat.process.getEnvVarOwned(allocator, "HOME") catch return false; + const home = std_compat.process.getEnvVarOwned(allocator, "HOME") catch blk: { + if (builtin.os.tag == .windows) { + break :blk std_compat.process.getEnvVarOwned(allocator, "USERPROFILE") catch return false; + } + return false; + }; defer allocator.free(home); const dot_name = std.fmt.allocPrint(allocator, ".{s}", .{component}) catch return false; defer allocator.free(dot_name); @@ -176,7 +182,7 @@ test "deriveDisplayName capitalizes first letter" { try std.testing.expectEqualStrings("", name3); } -test "handleList returns valid JSON with all 3 known components" { +test "handleList returns valid JSON with all known components" { const allocator = std.testing.allocator; var fixture = try test_helpers.TempPaths.init(allocator); defer fixture.deinit(); @@ -192,30 +198,34 @@ test "handleList returns valid JSON with all 3 known components" { try std.testing.expect(std.mem.startsWith(u8, json, "{\"components\":[")); try std.testing.expect(std.mem.endsWith(u8, json, "]}")); - // Verify all 3 components are present + // Verify all components are present try std.testing.expect(std.mem.indexOf(u8, json, "\"nullclaw\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"nullboiler\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"nulltickets\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"nullwatch\"") != null); // Verify display names try std.testing.expect(std.mem.indexOf(u8, json, "\"NullClaw\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"NullBoiler\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"NullTickets\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"NullWatch\"") != null); // Verify descriptions are present try std.testing.expect(std.mem.indexOf(u8, json, "Autonomous AI agent runtime") != null); try std.testing.expect(std.mem.indexOf(u8, json, "DAG-based workflow orchestrator") != null); try std.testing.expect(std.mem.indexOf(u8, json, "Task and issue tracker") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "Headless observability") != null); // Verify repo fields try std.testing.expect(std.mem.indexOf(u8, json, "\"nullclaw/nullclaw\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"nullclaw/NullBoiler\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"nullclaw/nulltickets\"") != null); + try std.testing.expect(std.mem.indexOf(u8, json, "\"nullclaw/nullwatch\"") != null); // Verify structural fields try std.testing.expect(std.mem.indexOf(u8, json, "\"alpha\"") != null); try std.testing.expectEqual(@as(usize, 2), std.mem.count(u8, json, "\"alpha\":true")); - try std.testing.expectEqual(@as(usize, 1), std.mem.count(u8, json, "\"alpha\":false")); + try std.testing.expectEqual(@as(usize, 2), std.mem.count(u8, json, "\"alpha\":false")); try std.testing.expect(std.mem.indexOf(u8, json, "\"installed\"") != null); try std.testing.expect(std.mem.indexOf(u8, json, "\"instance_count\"") != null); } diff --git a/src/api/instance_runtime.zig b/src/api/instance_runtime.zig index fe4cfe7..054179c 100644 --- a/src/api/instance_runtime.zig +++ b/src/api/instance_runtime.zig @@ -4,6 +4,7 @@ const state_mod = @import("../core/state.zig"); const manager_mod = @import("../supervisor/manager.zig"); const paths_mod = @import("../core/paths.zig"); const health_mod = @import("../supervisor/health.zig"); +const registry = @import("../installer/registry.zig"); pub const Snapshot = struct { status: manager_mod.Status, @@ -49,12 +50,57 @@ pub fn readPortFromConfig(allocator: std.mem.Allocator, paths: paths_mod.Paths, } } - return switch (current) { - .integer => |value| if (value >= 0 and value <= 65535) @intCast(value) else null, + return parsePortValue(current); +} + +fn parsePortValue(value: std.json.Value) ?u16 { + return switch (value) { + .integer => |raw| if (raw >= 0 and raw <= 65535) @intCast(raw) else null, + .number_string => |raw| std.fmt.parseInt(u16, raw, 10) catch null, + .string => |raw| std.fmt.parseInt(u16, raw, 10) catch null, else => null, }; } +fn readStringFromConfig(allocator: std.mem.Allocator, paths: paths_mod.Paths, component: []const u8, name: []const u8, dot_key: []const u8) ?[]u8 { + const config_path = paths.instanceConfig(allocator, component, name) catch return null; + defer allocator.free(config_path); + + const file = std_compat.fs.openFileAbsolute(config_path, .{}) catch return null; + defer file.close(); + const contents = file.readToEndAlloc(allocator, 4 * 1024 * 1024) catch return null; + defer allocator.free(contents); + + const parsed = std.json.parseFromSlice(std.json.Value, allocator, contents, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return null; + defer parsed.deinit(); + + var current = parsed.value; + var it = std.mem.splitScalar(u8, dot_key, '.'); + while (it.next()) |segment| { + switch (current) { + .object => |obj| current = obj.get(segment) orelse return null, + else => return null, + } + } + + if (current != .string) return null; + return allocator.dupe(u8, current.string) catch null; +} + +fn normalizeHealthHost(allocator: std.mem.Allocator, host: []const u8) ![]u8 { + if (host.len == 0 or + std.mem.eql(u8, host, "0.0.0.0") or + std.mem.eql(u8, host, "::") or + std.mem.eql(u8, host, "localhost")) + { + return allocator.dupe(u8, "127.0.0.1"); + } + return allocator.dupe(u8, host); +} + fn isImportedStandalone( allocator: std.mem.Allocator, paths: paths_mod.Paths, @@ -62,8 +108,9 @@ fn isImportedStandalone( name: []const u8, entry: state_mod.InstanceEntry, ) bool { - if (!std.mem.eql(u8, component, "nullclaw")) return false; - if (!std.mem.eql(u8, entry.launch_mode, "gateway")) return false; + const known = registry.findKnownComponent(component) orelse return false; + if (standalonePortConfigKey(component) == null) return false; + if (!isStandaloneLaunchMode(component, entry.launch_mode, known.default_launch_command)) return false; const inst_dir = paths.instanceDir(allocator, component, name) catch return false; defer allocator.free(inst_dir); @@ -81,6 +128,22 @@ fn isImportedStandalone( return std.mem.eql(u8, real_dir, real_standalone_root); } +fn standalonePortConfigKey(component: []const u8) ?[]const u8 { + if (std.mem.eql(u8, component, "nullclaw")) return "gateway.port"; + if (std.mem.eql(u8, component, "nullwatch")) return "port"; + return null; +} + +fn isStandaloneLaunchMode(component: []const u8, launch_mode: []const u8, default_launch_mode: []const u8) bool { + if (standalonePortConfigKey(component) == null) return false; + if (std.mem.eql(u8, launch_mode, default_launch_mode)) return true; + if (std.mem.eql(u8, component, "nullwatch")) { + return std.mem.eql(u8, launch_mode, "gateway") or + std.mem.eql(u8, launch_mode, "nullwatch"); + } + return false; +} + fn standaloneStatus(manager_snapshot: ?Snapshot, live_ok: bool) manager_mod.Status { if (live_ok) return .running; if (manager_snapshot) |snapshot| { @@ -102,10 +165,18 @@ fn deriveImportedStandaloneSnapshot( ) ?Snapshot { if (!isImportedStandalone(allocator, paths, component, name, entry)) return null; - const port = readPortFromConfig(allocator, paths, component, name, "gateway.port") orelse return null; + const known = registry.findKnownComponent(component) orelse return null; + const port_key = standalonePortConfigKey(component) orelse return null; + const port = readPortFromConfig(allocator, paths, component, name, port_key) orelse known.default_port; if (port == 0) return null; - const health = health_mod.check(allocator, "127.0.0.1", port, "/health"); + const configured_host = readStringFromConfig(allocator, paths, component, name, "host") orelse + allocator.dupe(u8, "127.0.0.1") catch return null; + defer allocator.free(configured_host); + const health_host = normalizeHealthHost(allocator, configured_host) catch return null; + defer allocator.free(health_host); + + const health = health_mod.check(allocator, health_host, port, known.default_health_endpoint); const status = standaloneStatus(manager_snapshot, health.ok); var snapshot = manager_snapshot orelse Snapshot{ .status = status }; snapshot.status = status; @@ -129,3 +200,43 @@ pub fn resolve( if (deriveImportedStandaloneSnapshot(allocator, paths, component, name, entry, manager_snapshot)) |snapshot| return snapshot; return manager_snapshot orelse .{ .status = .stopped }; } + +test "standalone runtime metadata covers nullclaw and nullwatch" { + try std.testing.expectEqualStrings("gateway.port", standalonePortConfigKey("nullclaw").?); + try std.testing.expectEqualStrings("port", standalonePortConfigKey("nullwatch").?); + try std.testing.expect(standalonePortConfigKey("nullboiler") == null); + + try std.testing.expect(isStandaloneLaunchMode("nullclaw", "gateway", "gateway")); + try std.testing.expect(isStandaloneLaunchMode("nullwatch", "serve", "serve")); + try std.testing.expect(isStandaloneLaunchMode("nullwatch", "gateway", "serve")); + try std.testing.expect(isStandaloneLaunchMode("nullwatch", "nullwatch", "serve")); + try std.testing.expect(!isStandaloneLaunchMode("nullboiler", "gateway", "gateway")); +} + +test "readPortFromConfig accepts string ports" { + const allocator = std.testing.allocator; + var fixture = try @import("../test_helpers.zig").TempPaths.init(allocator); + defer fixture.deinit(); + try fixture.paths.ensureDirs(); + + const inst_dir = try fixture.paths.instanceDir(allocator, "nullwatch", "watch"); + defer allocator.free(inst_dir); + try std_compat.fs.makeDirAbsolute(std.fs.path.dirname(inst_dir).?); + try std_compat.fs.makeDirAbsolute(inst_dir); + + const config_path = try fixture.paths.instanceConfig(allocator, "nullwatch", "watch"); + defer allocator.free(config_path); + const file = try std_compat.fs.createFileAbsolute(config_path, .{ .truncate = true }); + defer file.close(); + try file.writeAll("{\"port\":\"7711\",\"host\":\"::1\"}"); + + try std.testing.expectEqual(@as(u16, 7711), readPortFromConfig(allocator, fixture.paths, "nullwatch", "watch", "port").?); + + const host = readStringFromConfig(allocator, fixture.paths, "nullwatch", "watch", "host").?; + defer allocator.free(host); + try std.testing.expectEqualStrings("::1", host); + + const normalized = try normalizeHealthHost(allocator, "::"); + defer allocator.free(normalized); + try std.testing.expectEqualStrings("127.0.0.1", normalized); +} diff --git a/src/api/instances.zig b/src/api/instances.zig index 3987654..b93ed95 100644 --- a/src/api/instances.zig +++ b/src/api/instances.zig @@ -16,6 +16,9 @@ const nullclaw_web_channel = @import("../core/nullclaw_web_channel.zig"); const query_api = @import("query.zig"); const test_helpers = @import("../test_helpers.zig"); const instance_runtime = @import("instance_runtime.zig"); +const registry = @import("../installer/registry.zig"); +const downloader = @import("../installer/downloader.zig"); +const platform = @import("../core/platform.zig"); const ApiResponse = helpers.ApiResponse; const appendEscaped = helpers.appendEscaped; @@ -29,6 +32,139 @@ const default_tracker_prompt_template = // ─── Helpers ───────────────────────────────────────────────────────────────── +fn defaultLaunchModeForComponent(component: []const u8) []const u8 { + if (registry.findKnownComponent(component)) |known| return known.default_launch_command; + return "gateway"; +} + +fn isLegacyDefaultLaunchMode(component: []const u8, launch_mode: []const u8) bool { + const default_launch = defaultLaunchModeForComponent(component); + return !std.mem.eql(u8, default_launch, "gateway") and std.mem.eql(u8, launch_mode, "gateway"); +} + +const StartBinary = struct { + path: []const u8, + version: []const u8, + version_owned: bool = false, + + fn deinit(self: StartBinary, allocator: std.mem.Allocator) void { + allocator.free(self.path); + if (self.version_owned) allocator.free(self.version); + } +}; + +fn persistStartVersion( + s: *state_mod.State, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, + version: []const u8, +) !void { + const updated = try s.updateInstance(component, name, .{ + .version = version, + .auto_start = entry.auto_start, + .launch_mode = entry.launch_mode, + .verbose = entry.verbose, + }); + if (!updated) return error.StateError; + s.save() catch return error.StateError; +} + +fn resolveStandaloneStartBinary( + allocator: std.mem.Allocator, + s: *state_mod.State, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, +) !StartBinary { + if (local_binary.stageDevLocal(allocator, paths, component)) |dest_bin| { + persistStartVersion(s, component, name, entry, local_binary.dev_local_version) catch |err| { + allocator.free(dest_bin); + return err; + }; + return .{ .path = dest_bin, .version = local_binary.dev_local_version }; + } + + const known = registry.findKnownComponent(component) orelse return error.NoPlatformAsset; + var release = registry.fetchLatestRelease(allocator, known.repo) catch return error.FetchFailed; + defer release.deinit(); + + const platform_key = comptime platform.detect().toString(); + const asset = registry.findAssetForComponentPlatform(allocator, release.value, component, platform_key) orelse return error.NoPlatformAsset; + + const version = try allocator.dupe(u8, release.value.tag_name); + errdefer allocator.free(version); + const bin_path = try paths.binary(allocator, component, version); + errdefer allocator.free(bin_path); + + downloader.downloadIfMissing(allocator, asset.browser_download_url, bin_path) catch return error.DownloadFailed; + persistStartVersion(s, component, name, entry, version) catch |err| return err; + + return .{ .path = bin_path, .version = version, .version_owned = true }; +} + +fn resolveStartBinary( + allocator: std.mem.Allocator, + s: *state_mod.State, + paths: paths_mod.Paths, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, +) !StartBinary { + if (std.mem.eql(u8, entry.version, "standalone")) { + return resolveStandaloneStartBinary(allocator, s, paths, component, name, entry); + } + + local_binary.refreshStagedDevLocal(allocator, paths, component, entry.version); + return .{ + .path = try paths.binary(allocator, component, entry.version), + .version = entry.version, + }; +} + +fn startBinaryError(err: anyerror) ApiResponse { + return switch (err) { + error.FetchFailed => .{ + .status = "502 Bad Gateway", + .content_type = "application/json", + .body = "{\"error\":\"failed to fetch latest release\"}", + }, + error.NoPlatformAsset => .{ + .status = "502 Bad Gateway", + .content_type = "application/json", + .body = "{\"error\":\"no platform asset for latest version\"}", + }, + error.DownloadFailed => .{ + .status = "502 Bad Gateway", + .content_type = "application/json", + .body = "{\"error\":\"failed to download latest binary\"}", + }, + else => helpers.serverError(), + }; +} + +fn isExternalStandaloneRunning( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + manager: *manager_mod.Manager, + component: []const u8, + name: []const u8, + entry: state_mod.InstanceEntry, +) bool { + if (manager.getStatus(component, name) != null) return false; + const snapshot = instance_runtime.resolve(allocator, paths, manager, component, name, entry); + return snapshot.status == .running; +} + +fn externalStandaloneConflict() ApiResponse { + return .{ + .status = "409 Conflict", + .content_type = "application/json", + .body = "{\"error\":\"instance is running outside nullhub supervision\"}", + }; +} + const FetchedJsonValue = struct { bytes: []u8, parsed: std.json.Parsed(std.json.Value), @@ -262,6 +398,17 @@ fn listNullBoilersLocked( return integration_mod.listNullBoilers(allocator, state, paths); } +fn listNullWatchLocked( + allocator: std.mem.Allocator, + mutex: *std_compat.sync.Mutex, + state: *state_mod.State, + paths: paths_mod.Paths, +) ![]integration_mod.NullWatchConfig { + mutex.lock(); + defer mutex.unlock(); + return integration_mod.listNullWatch(allocator, state, paths); +} + const PipelineSummary = struct { id: []const u8, name: []const u8, @@ -276,6 +423,19 @@ const TrackerIntegrationOption = struct { pipelines: []const PipelineSummary = &.{}, }; +const WatchIntegrationOption = struct { + name: []const u8, + host: []const u8, + port: u16, + running: bool, +}; + +const ClawIntegrationOption = struct { + name: []const u8, + running: bool, + linked: bool, +}; + fn fetchPipelineSummaries(allocator: std.mem.Allocator, url: []const u8, bearer_token: ?[]const u8) ?[]PipelineSummary { var client: std.http.Client = .{ .allocator = allocator, .io = std_compat.io() }; defer client.deinit(); @@ -1848,6 +2008,9 @@ pub fn handleGet(allocator: std.mem.Allocator, s: *state_mod.State, manager: *ma /// POST /api/instances/{component}/{name}/start pub fn handleStart(allocator: std.mem.Allocator, s: *state_mod.State, manager: *manager_mod.Manager, paths: paths_mod.Paths, component: []const u8, name: []const u8, body: []const u8) ApiResponse { const entry = s.getInstance(component, name) orelse return notFound(); + if (isExternalStandaloneRunning(allocator, paths, manager, component, name, entry)) { + return externalStandaloneConflict(); + } _ = nullclaw_web_channel.ensureNullclawWebChannelConfig( allocator, @@ -1872,6 +2035,7 @@ pub fn handleStart(allocator: std.mem.Allocator, s: *state_mod.State, manager: * }; var launch_cmd: []const u8 = entry.launch_mode; var launch_verbose = entry.verbose; + var launch_mode_overridden = false; var parsed_body: ?std.json.Parsed(StartBody) = null; defer if (parsed_body) |*pb| pb.deinit(); if (body.len > 0) { @@ -1882,21 +2046,26 @@ pub fn handleStart(allocator: std.mem.Allocator, s: *state_mod.State, manager: * .{ .allocate = .alloc_always, .ignore_unknown_fields = true }, ) catch null; if (parsed_body) |pb| { - if (pb.value.launch_mode) |mode| launch_cmd = mode; + if (pb.value.launch_mode) |mode| { + launch_cmd = mode; + launch_mode_overridden = true; + } if (pb.value.verbose) |verbose| launch_verbose = verbose; } } - local_binary.refreshStagedDevLocal(allocator, paths, component, entry.version); - - // Resolve binary path - const bin_path = paths.binary(allocator, component, entry.version) catch return helpers.serverError(); - defer allocator.free(bin_path); + const start_binary = resolveStartBinary(allocator, s, paths, component, name, entry) catch |err| return startBinaryError(err); + defer start_binary.deinit(allocator); + const bin_path = start_binary.path; + const current_version = start_binary.version; // Read manifest from binary to get health endpoint and port var health_endpoint: []const u8 = "/health"; var port: u16 = 0; var port_from_config: []const u8 = ""; + var manifest_launch_command: []const u8 = ""; + var manifest_launch_mode: ?[]const u8 = null; + defer if (manifest_launch_mode) |mode| allocator.free(mode); const manifest_json = component_cli.exportManifest(allocator, bin_path) catch null; var parsed_manifest: ?std.json.Parsed(manifest_mod.Manifest) = null; if (manifest_json) |mj| { @@ -1905,11 +2074,36 @@ pub fn handleStart(allocator: std.mem.Allocator, s: *state_mod.State, manager: * health_endpoint = pm.value.health.endpoint; port_from_config = pm.value.health.port_from_config; if (pm.value.ports.len > 0) port = pm.value.ports[0].default; + manifest_launch_command = pm.value.launch.command; + manifest_launch_mode = launch_args_mod.fromManifestLaunch( + allocator, + component, + pm.value.launch.command, + pm.value.launch.args, + ) catch null; } } defer if (manifest_json) |mj| allocator.free(mj); defer if (parsed_manifest) |*pm| pm.deinit(); + if (!launch_mode_overridden) { + if (manifest_launch_mode) |mode| { + const should_normalize_launch = + (std.mem.eql(u8, launch_cmd, manifest_launch_command) and !std.mem.eql(u8, launch_cmd, mode)) or + isLegacyDefaultLaunchMode(component, launch_cmd); + if (should_normalize_launch) { + launch_cmd = mode; + _ = s.updateInstance(component, name, .{ + .version = current_version, + .auto_start = entry.auto_start, + .launch_mode = launch_cmd, + .verbose = entry.verbose, + }) catch {}; + s.save() catch {}; + } + } + } + // Try to read actual port from instance config.json using port_from_config key if (port_from_config.len > 0) { if (instance_runtime.readPortFromConfig(allocator, paths, component, name, port_from_config)) |config_port| { @@ -1932,15 +2126,21 @@ pub fn handleStart(allocator: std.mem.Allocator, s: *state_mod.State, manager: * } /// POST /api/instances/{component}/{name}/stop -pub fn handleStop(s: *state_mod.State, manager: *manager_mod.Manager, component: []const u8, name: []const u8) ApiResponse { - _ = s.getInstance(component, name) orelse return notFound(); +pub fn handleStop(allocator: std.mem.Allocator, s: *state_mod.State, manager: *manager_mod.Manager, paths: paths_mod.Paths, component: []const u8, name: []const u8) ApiResponse { + const entry = s.getInstance(component, name) orelse return notFound(); + if (isExternalStandaloneRunning(allocator, paths, manager, component, name, entry)) { + return externalStandaloneConflict(); + } manager.stopInstance(component, name) catch return helpers.serverError(); return jsonOk("{\"status\":\"stopped\"}"); } /// POST /api/instances/{component}/{name}/restart pub fn handleRestart(allocator: std.mem.Allocator, s: *state_mod.State, manager: *manager_mod.Manager, paths: paths_mod.Paths, component: []const u8, name: []const u8, body: []const u8) ApiResponse { - _ = s.getInstance(component, name) orelse return notFound(); + const entry = s.getInstance(component, name) orelse return notFound(); + if (isExternalStandaloneRunning(allocator, paths, manager, component, name, entry)) { + return externalStandaloneConflict(); + } manager.stopInstance(component, name) catch {}; return handleStart(allocator, s, manager, paths, component, name, body); } @@ -3155,6 +3355,7 @@ pub fn handleImport(allocator: std.mem.Allocator, s: *state_mod.State, paths: pa s.addInstance(component, "default", .{ .version = version, .auto_start = false, + .launch_mode = defaultLaunchModeForComponent(component), .verbose = false, }) catch return helpers.serverError(); s.save() catch return helpers.serverError(); @@ -3212,6 +3413,94 @@ fn handleIntegrationGet( component: []const u8, name: []const u8, ) ApiResponse { + if (std.mem.eql(u8, component, "nullclaw")) { + var link = integration_mod.loadNullClawTelemetryLink(allocator, paths, name) catch |err| switch (err) { + error.NotFound => return notFound(), + else => return helpers.serverError(), + }; + defer link.deinit(allocator); + const watches = listNullWatchLocked(allocator, mutex, s, paths) catch return helpers.serverError(); + defer integration_mod.deinitNullWatchConfigs(allocator, watches); + const linked = integration_mod.findNullWatchByEndpoint(watches, link.endpoint); + + var watch_options: std.ArrayListUnmanaged(WatchIntegrationOption) = .empty; + defer watch_options.deinit(allocator); + for (watches) |watch| { + const is_running = blk: { + const status = getStatusLocked(mutex, manager, "nullwatch", watch.name) orelse break :blk false; + break :blk status.status == .running; + }; + watch_options.append(allocator, .{ + .name = watch.name, + .host = watch.host, + .port = watch.port, + .running = is_running, + }) catch return helpers.serverError(); + } + + const body = std.json.Stringify.valueAlloc(allocator, .{ + .kind = "nullclaw", + .configured = link.configured, + .linked_watch = if (linked) |watch| .{ + .name = watch.name, + .host = watch.host, + .port = watch.port, + } else null, + .available_watches = watch_options.items, + .current_link = if (link.endpoint) |endpoint| .{ + .endpoint = endpoint, + .service_name = link.service_name orelse "", + .auth_header = link.auth_configured, + .source_header = link.source_header_configured, + } else null, + }, .{ .emit_null_optional_fields = false }) catch return helpers.serverError(); + return jsonOk(body); + } + + if (std.mem.eql(u8, component, "nullwatch")) { + var watch_cfg = integration_mod.loadNullWatchConfig(allocator, paths, name) catch null orelse return notFound(); + defer integration_mod.deinitNullWatchConfig(allocator, &watch_cfg); + + const claw_names_opt = blk: { + mutex.lock(); + defer mutex.unlock(); + break :blk s.instanceNames("nullclaw") catch return helpers.serverError(); + }; + defer if (claw_names_opt) |claw_names| s.allocator.free(claw_names); + + var claw_options: std.ArrayListUnmanaged(ClawIntegrationOption) = .empty; + defer claw_options.deinit(allocator); + if (claw_names_opt) |claw_names| { + for (claw_names) |claw_name| { + const is_running = blk: { + const status = getStatusLocked(mutex, manager, "nullclaw", claw_name) orelse break :blk false; + break :blk status.status == .running; + }; + const is_linked = blk: { + var link = integration_mod.loadNullClawTelemetryLink(allocator, paths, claw_name) catch break :blk false; + defer link.deinit(allocator); + break :blk integration_mod.findNullWatchByEndpoint(&.{watch_cfg}, link.endpoint) != null; + }; + claw_options.append(allocator, .{ + .name = claw_name, + .running = is_running, + .linked = is_linked, + }) catch return helpers.serverError(); + } + } + + const body = std.json.Stringify.valueAlloc(allocator, .{ + .kind = "nullwatch", + .watch = .{ + .name = watch_cfg.name, + .host = watch_cfg.host, + .port = watch_cfg.port, + }, + .available_claws = claw_options.items, + }, .{ .emit_null_optional_fields = false }) catch return helpers.serverError(); + return jsonOk(body); + } + if (std.mem.eql(u8, component, "nullboiler")) { var boiler_cfg = integration_mod.loadNullBoilerConfig(allocator, paths, name) catch null orelse return notFound(); defer integration_mod.deinitNullBoilerConfig(allocator, &boiler_cfg); @@ -3363,6 +3652,31 @@ fn handleIntegrationGet( return notFound(); } +fn linkNullClawTelemetry( + allocator: std.mem.Allocator, + s: *state_mod.State, + manager: *manager_mod.Manager, + mutex: *std_compat.sync.Mutex, + paths: paths_mod.Paths, + claw_name: []const u8, + watch_cfg: integration_mod.NullWatchConfig, +) ApiResponse { + integration_mod.linkNullClawToNullWatch(allocator, paths, claw_name, watch_cfg) catch |err| switch (err) { + error.NotFound => return notFound(), + else => return helpers.serverError(), + }; + + if (getStatusLocked(mutex, manager, "nullclaw", claw_name)) |status| { + if (status.status == .running) { + mutex.lock(); + defer mutex.unlock(); + return handleRestart(allocator, s, manager, paths, "nullclaw", claw_name, ""); + } + } + + return jsonOk("{\"status\":\"linked\"}"); +} + fn handleIntegrationPost( allocator: std.mem.Allocator, s: *state_mod.State, @@ -3373,7 +3687,52 @@ fn handleIntegrationPost( name: []const u8, body: []const u8, ) ApiResponse { - if (!std.mem.eql(u8, component, "nullboiler")) return badRequest("{\"error\":\"integration updates are only supported for nullboiler\"}"); + if (std.mem.eql(u8, component, "nullclaw")) { + const watch_cfg = blk: { + const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return badRequest("{\"error\":\"invalid JSON body\"}"); + defer parsed.deinit(); + if (parsed.value != .object) return badRequest("{\"error\":\"invalid JSON body\"}"); + const watch_name = if (parsed.value.object.get("watch_instance")) |value| + if (value == .string and value.string.len > 0) value.string else null + else + null; + if (watch_name == null) return badRequest("{\"error\":\"watch_instance is required\"}"); + break :blk integration_mod.loadNullWatchConfig(allocator, paths, watch_name.?) catch null orelse return notFound(); + }; + defer { + var owned_cfg = watch_cfg; + integration_mod.deinitNullWatchConfig(allocator, &owned_cfg); + } + + return linkNullClawTelemetry(allocator, s, manager, mutex, paths, name, watch_cfg); + } + + if (std.mem.eql(u8, component, "nullwatch")) { + const claw_name = blk: { + const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return badRequest("{\"error\":\"invalid JSON body\"}"); + defer parsed.deinit(); + if (parsed.value != .object) return badRequest("{\"error\":\"invalid JSON body\"}"); + const value = if (parsed.value.object.get("claw_instance")) |item| + if (item == .string and item.string.len > 0) item.string else null + else + null; + if (value == null) return badRequest("{\"error\":\"claw_instance is required\"}"); + break :blk allocator.dupe(u8, value.?) catch return helpers.serverError(); + }; + defer allocator.free(claw_name); + + var watch_cfg = integration_mod.loadNullWatchConfig(allocator, paths, name) catch null orelse return notFound(); + defer integration_mod.deinitNullWatchConfig(allocator, &watch_cfg); + return linkNullClawTelemetry(allocator, s, manager, mutex, paths, claw_name, watch_cfg); + } + + if (!std.mem.eql(u8, component, "nullboiler")) return badRequest("{\"error\":\"integration updates are only supported for nullclaw, nullwatch, and nullboiler\"}"); const tracker_cfg = blk: { const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{ @@ -3780,7 +4139,7 @@ pub fn dispatch( if (!std.mem.eql(u8, method, "POST")) return methodNotAllowed(); if (std.mem.eql(u8, action, "start")) return handleStart(allocator, s, manager, paths, parsed.component, parsed.name, body); - if (std.mem.eql(u8, action, "stop")) return handleStop(s, manager, parsed.component, parsed.name); + if (std.mem.eql(u8, action, "stop")) return handleStop(allocator, s, manager, paths, parsed.component, parsed.name); if (std.mem.eql(u8, action, "restart")) return handleRestart(allocator, s, manager, paths, parsed.component, parsed.name, body); return notFound(); @@ -3824,6 +4183,15 @@ const TestManagerCtx = struct { } }; +test "component default launch mode uses registry metadata" { + try std.testing.expectEqualStrings("gateway", defaultLaunchModeForComponent("nullclaw")); + try std.testing.expectEqualStrings("serve", defaultLaunchModeForComponent("nullwatch")); + try std.testing.expectEqualStrings("gateway", defaultLaunchModeForComponent("unknown-component")); + try std.testing.expect(isLegacyDefaultLaunchMode("nullwatch", "gateway")); + try std.testing.expect(!isLegacyDefaultLaunchMode("nullwatch", "serve")); + try std.testing.expect(!isLegacyDefaultLaunchMode("nullclaw", "gateway")); +} + fn writeTestInstanceConfig( allocator: std.mem.Allocator, paths: paths_mod.Paths, @@ -4288,6 +4656,93 @@ test "handleStart keeps gateway instances on their HTTP health port" { mctx.manager.stopInstance("nullclaw", "my-agent") catch {}; } +test "handleStart normalizes manifest binary command to runnable launch args" { + if (comptime builtin.os.tag == .windows) return error.SkipZigTest; + + const allocator = std.testing.allocator; + var state_fixture = try test_helpers.TempPaths.init(allocator); + defer state_fixture.deinit(); + const state_path = try state_fixture.paths.state(allocator); + defer allocator.free(state_path); + var s = state_mod.State.init(allocator, state_path); + defer s.deinit(); + var mctx = TestManagerCtx.init(allocator); + defer mctx.deinit(allocator); + + try s.addInstance("nullwatch", "watch", .{ .version = "1.0.0", .launch_mode = "nullwatch" }); + try writeTestInstanceConfig(allocator, mctx.paths, "nullwatch", "watch", "{\"port\":43124}"); + try writeTestBinary( + allocator, + mctx.paths, + "nullwatch", + "1.0.0", + \\#!/bin/sh + \\set -eu + \\if [ "$1" = "--export-manifest" ]; then + \\ printf '%s\n' '{"launch":{"command":"nullwatch","args":["serve"]},"health":{"endpoint":"/health","port_from_config":"port"},"ports":[{"name":"api","config_key":"port","default":7710,"protocol":"http"}]}' + \\ exit 0 + \\fi + \\sleep 60 + , + ); + + const resp = handleStart(allocator, &s, &mctx.manager, mctx.paths, "nullwatch", "watch", ""); + try std.testing.expectEqualStrings("200 OK", resp.status); + + const entry = s.getInstance("nullwatch", "watch").?; + try std.testing.expectEqualStrings("serve", entry.launch_mode); + + const status = mctx.manager.getStatus("nullwatch", "watch").?; + try std.testing.expectEqual(manager_mod.Status.starting, status.status); + try std.testing.expectEqual(@as(u16, 43124), status.port); + const inst = mctx.manager.instances.get("nullwatch/watch").?; + try std.testing.expectEqual(@as(usize, 1), inst.launch_args.len); + try std.testing.expectEqualStrings("serve", inst.launch_args[0]); + + mctx.manager.stopInstance("nullwatch", "watch") catch {}; +} + +test "handleStart normalizes legacy default launch mode for nullwatch" { + if (comptime builtin.os.tag == .windows) return error.SkipZigTest; + + const allocator = std.testing.allocator; + var state_fixture = try test_helpers.TempPaths.init(allocator); + defer state_fixture.deinit(); + const state_path = try state_fixture.paths.state(allocator); + defer allocator.free(state_path); + var s = state_mod.State.init(allocator, state_path); + defer s.deinit(); + var mctx = TestManagerCtx.init(allocator); + defer mctx.deinit(allocator); + + try s.addInstance("nullwatch", "watch", .{ .version = "1.0.0", .launch_mode = "gateway" }); + try writeTestInstanceConfig(allocator, mctx.paths, "nullwatch", "watch", "{\"port\":43125}"); + try writeTestBinary( + allocator, + mctx.paths, + "nullwatch", + "1.0.0", + \\#!/bin/sh + \\set -eu + \\if [ "$1" = "--export-manifest" ]; then + \\ printf '%s\n' '{"launch":{"command":"nullwatch","args":["serve"]},"health":{"endpoint":"/health","port_from_config":"port"},"ports":[{"name":"api","config_key":"port","default":7710,"protocol":"http"}]}' + \\ exit 0 + \\fi + \\sleep 60 + , + ); + + const resp = handleStart(allocator, &s, &mctx.manager, mctx.paths, "nullwatch", "watch", ""); + try std.testing.expectEqualStrings("200 OK", resp.status); + + const entry = s.getInstance("nullwatch", "watch").?; + try std.testing.expectEqualStrings("serve", entry.launch_mode); + const inst = mctx.manager.instances.get("nullwatch/watch").?; + try std.testing.expectEqualStrings("serve", inst.launch_args[0]); + + mctx.manager.stopInstance("nullwatch", "watch") catch {}; +} + test "handleStop returns 200 for existing instance" { const allocator = std.testing.allocator; var state_fixture = try test_helpers.TempPaths.init(allocator); @@ -4301,7 +4756,7 @@ test "handleStop returns 200 for existing instance" { try s.addInstance("nullclaw", "my-agent", .{ .version = "1.0.0" }); - const resp = handleStop(&s, &mctx.manager, "nullclaw", "my-agent"); + const resp = handleStop(allocator, &s, &mctx.manager, mctx.paths, "nullclaw", "my-agent"); try std.testing.expectEqualStrings("200 OK", resp.status); try std.testing.expectEqualStrings("{\"status\":\"stopped\"}", resp.body); } @@ -5082,6 +5537,204 @@ test "dispatch routes GET integration action for linked nullboiler" { try std.testing.expectEqual(@as(i64, 2), current_link.get("max_concurrent_tasks").?.integer); } +test "dispatch routes GET integration action for nullclaw nullwatch telemetry" { + const allocator = std.testing.allocator; + var state_fixture = try test_helpers.TempPaths.init(allocator); + defer state_fixture.deinit(); + const state_path = try state_fixture.paths.state(allocator); + defer allocator.free(state_path); + var s = state_mod.State.init(allocator, state_path); + defer s.deinit(); + var mctx = TestManagerCtx.init(allocator); + defer mctx.deinit(allocator); + + try s.addInstance("nullwatch", "observer-a", .{ .version = "1.0.0" }); + try s.addInstance("nullclaw", "my-agent", .{ .version = "1.0.0" }); + + try writeTestInstanceConfig(allocator, mctx.paths, "nullwatch", "observer-a", "{\"host\":\"127.0.0.1\",\"port\":7711,\"api_token\":\"watch-token\"}"); + try writeTestInstanceConfig( + allocator, + mctx.paths, + "nullclaw", + "my-agent", + "{\"diagnostics\":{\"backend\":\"otel\",\"otel\":{\"endpoint\":\"http://127.0.0.1:7711\",\"service_name\":\"nullclaw/my-agent\",\"headers\":{\"Authorization\":\"Bearer watch-token\",\"x-nullwatch-source\":\"nullclaw\"}}}}", + ); + + const resp = dispatch(allocator, &s, &mctx.manager, &mctx.mutex, mctx.paths, "GET", "/api/instances/nullclaw/my-agent/integration", "").?; + defer allocator.free(resp.body); + + try std.testing.expectEqualStrings("200 OK", resp.status); + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, resp.body, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + + try std.testing.expectEqualStrings("nullclaw", parsed.value.object.get("kind").?.string); + try std.testing.expect(parsed.value.object.get("configured").?.bool); + const linked = parsed.value.object.get("linked_watch").?.object; + try std.testing.expectEqualStrings("observer-a", linked.get("name").?.string); + try std.testing.expectEqual(@as(i64, 7711), linked.get("port").?.integer); + const current_link = parsed.value.object.get("current_link").?.object; + try std.testing.expectEqualStrings("http://127.0.0.1:7711", current_link.get("endpoint").?.string); + try std.testing.expectEqualStrings("nullclaw/my-agent", current_link.get("service_name").?.string); + try std.testing.expect(current_link.get("auth_header").?.bool); + try std.testing.expect(current_link.get("source_header").?.bool); + try std.testing.expectEqual(@as(usize, 1), parsed.value.object.get("available_watches").?.array.items.len); +} + +test "dispatch routes POST integration action for nullclaw links nullwatch" { + const allocator = std.testing.allocator; + var state_fixture = try test_helpers.TempPaths.init(allocator); + defer state_fixture.deinit(); + const state_path = try state_fixture.paths.state(allocator); + defer allocator.free(state_path); + var s = state_mod.State.init(allocator, state_path); + defer s.deinit(); + var mctx = TestManagerCtx.init(allocator); + defer mctx.deinit(allocator); + + try s.addInstance("nullwatch", "observer-a", .{ .version = "1.0.0" }); + try s.addInstance("nullclaw", "my-agent", .{ .version = "1.0.0" }); + + try writeTestInstanceConfig(allocator, mctx.paths, "nullwatch", "observer-a", "{\"host\":\"0.0.0.0\",\"port\":7712,\"api_token\":\"watch-token\"}"); + try writeTestInstanceConfig( + allocator, + mctx.paths, + "nullclaw", + "my-agent", + "{\"diagnostics\":{\"backend\":\"jsonl\",\"log_tool_calls\":true,\"otel\":{\"service_name\":\"nullclaw\",\"headers\":{\"Authorization\":\"Bearer old\"}}}}", + ); + + const resp = dispatch( + allocator, + &s, + &mctx.manager, + &mctx.mutex, + mctx.paths, + "POST", + "/api/instances/nullclaw/my-agent/integration", + "{\"watch_instance\":\"observer-a\"}", + ).?; + try std.testing.expectEqualStrings("200 OK", resp.status); + + const config_path = try mctx.paths.instanceConfig(allocator, "nullclaw", "my-agent"); + defer allocator.free(config_path); + const config_bytes = try std.fs.readFileAbsolute(allocator, config_path, 1024 * 1024); + defer allocator.free(config_bytes); + + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, config_bytes, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + + const diagnostics = parsed.value.object.get("diagnostics").?.object; + try std.testing.expectEqualStrings("otel", diagnostics.get("backend").?.string); + try std.testing.expect(diagnostics.get("log_tool_calls").?.bool); + const otel = diagnostics.get("otel").?.object; + try std.testing.expectEqualStrings("http://127.0.0.1:7712", otel.get("endpoint").?.string); + try std.testing.expectEqualStrings("nullclaw/my-agent", otel.get("service_name").?.string); + const headers = otel.get("headers").?.object; + try std.testing.expectEqualStrings("Bearer watch-token", headers.get("Authorization").?.string); + try std.testing.expectEqualStrings("nullclaw", headers.get("x-nullwatch-source").?.string); +} + +test "dispatch routes GET integration action for nullwatch lists linked nullclaws" { + const allocator = std.testing.allocator; + var state_fixture = try test_helpers.TempPaths.init(allocator); + defer state_fixture.deinit(); + const state_path = try state_fixture.paths.state(allocator); + defer allocator.free(state_path); + var s = state_mod.State.init(allocator, state_path); + defer s.deinit(); + var mctx = TestManagerCtx.init(allocator); + defer mctx.deinit(allocator); + + try s.addInstance("nullwatch", "observer-a", .{ .version = "1.0.0" }); + try s.addInstance("nullclaw", "linked-agent", .{ .version = "1.0.0" }); + try s.addInstance("nullclaw", "plain-agent", .{ .version = "1.0.0" }); + + try writeTestInstanceConfig(allocator, mctx.paths, "nullwatch", "observer-a", "{\"port\":7711}"); + try writeTestInstanceConfig(allocator, mctx.paths, "nullclaw", "linked-agent", "{\"diagnostics\":{\"backend\":\"otel\",\"otel\":{\"endpoint\":\"http://127.0.0.1:7711\",\"service_name\":\"nullclaw/linked-agent\"}}}"); + try writeTestInstanceConfig(allocator, mctx.paths, "nullclaw", "plain-agent", "{\"diagnostics\":{\"backend\":\"jsonl\"}}"); + + const resp = dispatch(allocator, &s, &mctx.manager, &mctx.mutex, mctx.paths, "GET", "/api/instances/nullwatch/observer-a/integration", "").?; + defer allocator.free(resp.body); + try std.testing.expectEqualStrings("200 OK", resp.status); + + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, resp.body, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + + try std.testing.expectEqualStrings("nullwatch", parsed.value.object.get("kind").?.string); + const claws = parsed.value.object.get("available_claws").?.array.items; + try std.testing.expectEqual(@as(usize, 2), claws.len); + + var linked_found = false; + var plain_found = false; + for (claws) |claw| { + const obj = claw.object; + if (std.mem.eql(u8, obj.get("name").?.string, "linked-agent")) { + linked_found = obj.get("linked").?.bool; + } + if (std.mem.eql(u8, obj.get("name").?.string, "plain-agent")) { + plain_found = !obj.get("linked").?.bool; + } + } + try std.testing.expect(linked_found); + try std.testing.expect(plain_found); +} + +test "dispatch routes POST integration action for nullwatch links selected nullclaw" { + const allocator = std.testing.allocator; + var state_fixture = try test_helpers.TempPaths.init(allocator); + defer state_fixture.deinit(); + const state_path = try state_fixture.paths.state(allocator); + defer allocator.free(state_path); + var s = state_mod.State.init(allocator, state_path); + defer s.deinit(); + var mctx = TestManagerCtx.init(allocator); + defer mctx.deinit(allocator); + + try s.addInstance("nullwatch", "observer-a", .{ .version = "1.0.0" }); + try s.addInstance("nullclaw", "my-agent", .{ .version = "1.0.0" }); + + try writeTestInstanceConfig(allocator, mctx.paths, "nullwatch", "observer-a", "{\"port\":7713}"); + try writeTestInstanceConfig(allocator, mctx.paths, "nullclaw", "my-agent", "{\"diagnostics\":{\"backend\":\"jsonl\"}}"); + + const resp = dispatch( + allocator, + &s, + &mctx.manager, + &mctx.mutex, + mctx.paths, + "POST", + "/api/instances/nullwatch/observer-a/integration", + "{\"claw_instance\":\"my-agent\"}", + ).?; + try std.testing.expectEqualStrings("200 OK", resp.status); + + const config_path = try mctx.paths.instanceConfig(allocator, "nullclaw", "my-agent"); + defer allocator.free(config_path); + const config_bytes = try std.fs.readFileAbsolute(allocator, config_path, 1024 * 1024); + defer allocator.free(config_bytes); + + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, config_bytes, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + + const diagnostics = parsed.value.object.get("diagnostics").?.object; + try std.testing.expectEqualStrings("otel", diagnostics.get("backend").?.string); + const otel = diagnostics.get("otel").?.object; + try std.testing.expectEqualStrings("http://127.0.0.1:7713", otel.get("endpoint").?.string); + try std.testing.expectEqualStrings("nullclaw/my-agent", otel.get("service_name").?.string); +} + test "dispatch routes POST integration action for nullboiler" { const allocator = std.testing.allocator; var state_fixture = try test_helpers.TempPaths.init(allocator); diff --git a/src/api/meta.zig b/src/api/meta.zig index 56bb876..0313e21 100644 --- a/src/api/meta.zig +++ b/src/api/meta.zig @@ -1061,7 +1061,7 @@ const routes = [_]RouteSpec{ .method = "GET", .path_template = "/api/instances/{component}/{name}/integration", .category = "instances", - .summary = "Read integration status for linked orchestration and tracker components.", + .summary = "Read integration status for linked telemetry, orchestration, and tracker components.", .auth_mode = "optional_bearer", .path_params = common_instance_params[0..], .response = "Integration status and linkage payload.", @@ -1071,7 +1071,7 @@ const routes = [_]RouteSpec{ .method = "POST", .path_template = "/api/instances/{component}/{name}/integration", .category = "instances", - .summary = "Link or relink supported components such as nullboiler and nulltickets.", + .summary = "Link or relink supported components such as nullclaw, nullwatch, nullboiler, and nulltickets.", .auth_mode = "optional_bearer", .path_params = common_instance_params[0..], .body = "Integration update payload.", @@ -1333,6 +1333,16 @@ const routes = [_]RouteSpec{ .body = "Forwarded as-is to the orchestration backend.", .response = "Forwarded upstream JSON response.", }, + .{ + .id = "observability.proxy", + .method = "ANY", + .path_template = "/api/observability/{...}", + .category = "observability", + .summary = "Proxy observability requests to a managed or configured NullWatch instance.", + .auth_mode = "optional_bearer", + .body = "Forwarded as-is to NullWatch.", + .response = "Forwarded upstream JSON response.", + }, }; pub fn allRoutes() []const RouteSpec { diff --git a/src/api/observability.zig b/src/api/observability.zig new file mode 100644 index 0000000..bec5071 --- /dev/null +++ b/src/api/observability.zig @@ -0,0 +1,120 @@ +const std = @import("std"); +const http_proxy = @import("proxy.zig"); +const query = @import("query.zig"); + +const Allocator = std.mem.Allocator; + +const Response = http_proxy.Response; + +const prefix = "/api/observability"; + +pub const Config = struct { + watch_url: ?[]const u8 = null, + watch_token: ?[]const u8 = null, +}; + +pub fn isProxyPath(target: []const u8) bool { + return http_proxy.isPathInNamespace(target, prefix) or + (target.len > prefix.len and + std.mem.startsWith(u8, target, prefix) and + target[prefix.len] == '?'); +} + +pub fn selectedWatchNameAlloc(allocator: Allocator, target: []const u8) !?[]u8 { + return try query.valueAlloc(allocator, target, "nullhub_watch"); +} + +fn isSelectorParam(param: []const u8) bool { + const key = if (std.mem.indexOfScalar(u8, param, '=')) |idx| param[0..idx] else param; + return std.mem.eql(u8, key, "nullhub_watch"); +} + +fn stripSelectorParamsAlloc(allocator: Allocator, target: []const u8) ![]u8 { + const qmark = std.mem.indexOfScalar(u8, target, '?') orelse return allocator.dupe(u8, target); + + var buf = std.array_list.Managed(u8).init(allocator); + errdefer buf.deinit(); + try buf.appendSlice(target[0..qmark]); + + var wrote_query = false; + var params = std.mem.splitScalar(u8, target[qmark + 1 ..], '&'); + while (params.next()) |param| { + if (param.len == 0 or isSelectorParam(param)) continue; + try buf.append(if (wrote_query) '&' else '?'); + wrote_query = true; + try buf.appendSlice(param); + } + + return buf.toOwnedSlice(); +} + +/// Proxies observability API requests to a managed or configured NullWatch instance. +/// The shared `/api/observability` prefix is stripped before forwarding, so +/// `/api/observability/v1/runs` becomes `/v1/runs` on NullWatch. +pub fn handle(allocator: Allocator, method: []const u8, target: []const u8, body: []const u8, cfg: Config) Response { + if (!isProxyPath(target)) { + return .{ .status = "404 Not Found", .content_type = "application/json", .body = "{\"error\":\"not found\"}" }; + } + + const base_url = cfg.watch_url orelse + return .{ .status = "503 Service Unavailable", .content_type = "application/json", .body = "{\"error\":\"NullWatch not configured\"}" }; + + const forward_target = stripSelectorParamsAlloc(allocator, target) catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + defer allocator.free(forward_target); + + const proxied_path = forward_target[prefix.len..]; + const path = if (proxied_path.len == 0) "/v1/summary" else proxied_path; + return http_proxy.forward(allocator, .{ + .method = method, + .base_url = base_url, + .path = path, + .body = body, + .bearer_token = cfg.watch_token, + .unreachable_body = "{\"error\":\"NullWatch unreachable\"}", + }); +} + +test "isProxyPath matches observability namespace" { + try std.testing.expect(isProxyPath("/api/observability")); + try std.testing.expect(isProxyPath("/api/observability?watch=default")); + try std.testing.expect(isProxyPath("/api/observability/v1/runs")); + try std.testing.expect(isProxyPath("/api/observability/health")); + try std.testing.expect(!isProxyPath("/api/orchestration/v1/runs")); +} + +test "handle returns not configured without NullWatch URL" { + const resp = handle(std.testing.allocator, "GET", "/api/observability/v1/summary", "", .{}); + try std.testing.expectEqualStrings("503 Service Unavailable", resp.status); + try std.testing.expectEqualStrings("{\"error\":\"NullWatch not configured\"}", resp.body); +} + +test "handle rejects non-observability paths" { + const resp = handle(std.testing.allocator, "GET", "/api/status", "", .{ + .watch_url = "http://127.0.0.1:7710", + }); + try std.testing.expectEqualStrings("404 Not Found", resp.status); +} + +test "selectedWatchNameAlloc reads hub selector query params" { + const allocator = std.testing.allocator; + const selected = (try selectedWatchNameAlloc(allocator, "/api/observability/v1/runs?limit=1&nullhub_watch=watch+one")).?; + defer allocator.free(selected); + try std.testing.expectEqualStrings("watch one", selected); + try std.testing.expect((try selectedWatchNameAlloc(allocator, "/api/observability/v1/runs?watch=upstream")) == null); +} + +test "stripSelectorParamsAlloc removes only NullHub watch selector" { + const allocator = std.testing.allocator; + const stripped = try stripSelectorParamsAlloc(allocator, "/api/observability/v1/runs?limit=50&nullhub_watch=alpha&status=ok"); + defer allocator.free(stripped); + try std.testing.expectEqualStrings("/api/observability/v1/runs?limit=50&status=ok", stripped); + + const root = try stripSelectorParamsAlloc(allocator, "/api/observability?nullhub_watch=alpha"); + defer allocator.free(root); + try std.testing.expectEqualStrings("/api/observability", root); + + const upstream_filter = try stripSelectorParamsAlloc(allocator, "/api/observability/v1/runs?watch=alpha&instance=demo"); + defer allocator.free(upstream_filter); + try std.testing.expectEqualStrings("/api/observability/v1/runs?watch=alpha&instance=demo", upstream_filter); +} diff --git a/src/api/orchestration.zig b/src/api/orchestration.zig index 290edca..8de6b06 100644 --- a/src/api/orchestration.zig +++ b/src/api/orchestration.zig @@ -1,12 +1,9 @@ const std = @import("std"); const std_compat = @import("compat"); +const http_proxy = @import("proxy.zig"); const Allocator = std.mem.Allocator; -const Response = struct { - status: []const u8, - content_type: []const u8, - body: []const u8, -}; +const Response = http_proxy.Response; const prefix = "/api/orchestration"; const store_prefix = "/api/orchestration/store"; @@ -38,7 +35,7 @@ const Backend = enum { }; pub fn isProxyPath(target: []const u8) bool { - return std.mem.eql(u8, target, prefix) or std.mem.startsWith(u8, target, prefix ++ "/"); + return http_proxy.isPathInNamespace(target, prefix); } fn isStorePath(target: []const u8) bool { @@ -90,81 +87,17 @@ pub fn handle(allocator: Allocator, method: []const u8, target: []const u8, body const resolved = resolveProxyTarget(target, cfg) orelse return .{ .status = "503 Service Unavailable", .content_type = "application/json", .body = backend.notConfiguredBody() }; - const http_method = parseMethod(method) orelse - return .{ .status = "405 Method Not Allowed", .content_type = "application/json", .body = "{\"error\":\"method not allowed\"}" }; - const proxied_path = target[prefix.len..]; const path = if (proxied_path.len == 0) "/" else proxied_path; - const url = std.fmt.allocPrint(allocator, "{s}{s}", .{ resolved.base_url, path }) catch - return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; - defer allocator.free(url); - - var auth_header: ?[]const u8 = null; - defer if (auth_header) |value| allocator.free(value); - var header_buf: [1]std.http.Header = undefined; - const extra_headers: []const std.http.Header = if (resolved.token) |token| blk: { - auth_header = std.fmt.allocPrint(allocator, "Bearer {s}", .{token}) catch - return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; - header_buf[0] = .{ .name = "Authorization", .value = auth_header.? }; - break :blk header_buf[0..1]; - } else &.{}; - - var client: std.http.Client = .{ .allocator = allocator, .io = std_compat.io() }; - defer client.deinit(); - - var response_body: std.Io.Writer.Allocating = .init(allocator); - defer response_body.deinit(); - - const result = client.fetch(.{ - .location = .{ .url = url }, - .method = http_method, - .payload = if (body.len > 0) body else null, - .response_writer = &response_body.writer, - .extra_headers = extra_headers, - }) catch { - return .{ .status = "502 Bad Gateway", .content_type = "application/json", .body = resolved.backend.unreachableBody() }; - }; - - const status_code: u10 = @intFromEnum(result.status); - const resp_body = response_body.toOwnedSlice() catch - return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; - - const status = mapStatus(status_code); - - return .{ - .status = status, - .content_type = "application/json", - .body = resp_body, - }; -} - -fn parseMethod(method: []const u8) ?std.http.Method { - if (std.mem.eql(u8, method, "GET")) return .GET; - if (std.mem.eql(u8, method, "POST")) return .POST; - if (std.mem.eql(u8, method, "PUT")) return .PUT; - if (std.mem.eql(u8, method, "DELETE")) return .DELETE; - if (std.mem.eql(u8, method, "PATCH")) return .PATCH; - return null; -} - -fn mapStatus(code: u10) []const u8 { - return switch (code) { - 200 => "200 OK", - 201 => "201 Created", - 204 => "204 No Content", - 400 => "400 Bad Request", - 401 => "401 Unauthorized", - 403 => "403 Forbidden", - 404 => "404 Not Found", - 405 => "405 Method Not Allowed", - 409 => "409 Conflict", - 422 => "422 Unprocessable Entity", - 500 => "500 Internal Server Error", - 502 => "502 Bad Gateway", - 503 => "503 Service Unavailable", - else => if (code >= 200 and code < 300) "200 OK" else if (code >= 400 and code < 500) "400 Bad Request" else "500 Internal Server Error", - }; + return http_proxy.forward(allocator, .{ + .method = method, + .base_url = resolved.base_url, + .path = path, + .body = body, + .bearer_token = resolved.token, + .unreachable_body = resolved.backend.unreachableBody(), + }); } const TestUpstream = struct { diff --git a/src/api/proxy.zig b/src/api/proxy.zig new file mode 100644 index 0000000..7d9d11b --- /dev/null +++ b/src/api/proxy.zig @@ -0,0 +1,108 @@ +const std = @import("std"); +const std_compat = @import("compat"); + +const Allocator = std.mem.Allocator; + +pub const Response = struct { + status: []const u8, + content_type: []const u8, + body: []const u8, +}; + +pub const ForwardOptions = struct { + method: []const u8, + base_url: []const u8, + path: []const u8, + body: []const u8, + bearer_token: ?[]const u8 = null, + unreachable_body: []const u8 = "{\"error\":\"upstream unreachable\"}", +}; + +pub fn isPathInNamespace(target: []const u8, prefix: []const u8) bool { + return std.mem.eql(u8, target, prefix) or + (target.len > prefix.len and + std.mem.startsWith(u8, target, prefix) and + target[prefix.len] == '/'); +} + +pub fn forward(allocator: Allocator, opts: ForwardOptions) Response { + const http_method = parseMethod(opts.method) orelse + return .{ .status = "405 Method Not Allowed", .content_type = "application/json", .body = "{\"error\":\"method not allowed\"}" }; + + const url = std.fmt.allocPrint(allocator, "{s}{s}", .{ opts.base_url, opts.path }) catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + defer allocator.free(url); + + var auth_header: ?[]const u8 = null; + defer if (auth_header) |value| allocator.free(value); + var header_buf: [1]std.http.Header = undefined; + const extra_headers: []const std.http.Header = if (opts.bearer_token) |token| blk: { + auth_header = std.fmt.allocPrint(allocator, "Bearer {s}", .{token}) catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + header_buf[0] = .{ .name = "Authorization", .value = auth_header.? }; + break :blk header_buf[0..1]; + } else &.{}; + + var client: std.http.Client = .{ .allocator = allocator, .io = std_compat.io() }; + defer client.deinit(); + + var response_body: std.Io.Writer.Allocating = .init(allocator); + defer response_body.deinit(); + + const result = client.fetch(.{ + .location = .{ .url = url }, + .method = http_method, + .payload = if (opts.body.len > 0) opts.body else null, + .response_writer = &response_body.writer, + .extra_headers = extra_headers, + }) catch { + return .{ .status = "502 Bad Gateway", .content_type = "application/json", .body = opts.unreachable_body }; + }; + + const resp_body = response_body.toOwnedSlice() catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + + return .{ + .status = mapStatus(@intFromEnum(result.status)), + .content_type = "application/json", + .body = resp_body, + }; +} + +fn parseMethod(method: []const u8) ?std.http.Method { + if (std.mem.eql(u8, method, "GET")) return .GET; + if (std.mem.eql(u8, method, "POST")) return .POST; + if (std.mem.eql(u8, method, "PUT")) return .PUT; + if (std.mem.eql(u8, method, "DELETE")) return .DELETE; + if (std.mem.eql(u8, method, "PATCH")) return .PATCH; + return null; +} + +fn mapStatus(code: u10) []const u8 { + return switch (code) { + 200 => "200 OK", + 201 => "201 Created", + 204 => "204 No Content", + 400 => "400 Bad Request", + 401 => "401 Unauthorized", + 403 => "403 Forbidden", + 404 => "404 Not Found", + 405 => "405 Method Not Allowed", + 409 => "409 Conflict", + 415 => "415 Unsupported Media Type", + 422 => "422 Unprocessable Entity", + 500 => "500 Internal Server Error", + 502 => "502 Bad Gateway", + 503 => "503 Service Unavailable", + else => if (code >= 200 and code < 300) "200 OK" else if (code >= 400 and code < 500) "400 Bad Request" else "500 Internal Server Error", + }; +} + +test "isPathInNamespace matches exact and slash-delimited paths" { + try std.testing.expect(isPathInNamespace("/api/observability", "/api/observability")); + try std.testing.expect(isPathInNamespace("/api/observability/v1/runs", "/api/observability")); + try std.testing.expect(isPathInNamespace("/api/observability/v1/runs?limit=1", "/api/observability")); + try std.testing.expect(!isPathInNamespace("/api/observability?limit=1", "/api/observability")); + try std.testing.expect(!isPathInNamespace("/api/observability-extra", "/api/observability")); + try std.testing.expect(!isPathInNamespace("/api/orchestration", "/api/observability")); +} diff --git a/src/api/updates.zig b/src/api/updates.zig index 7434c5c..66f0750 100644 --- a/src/api/updates.zig +++ b/src/api/updates.zig @@ -57,6 +57,16 @@ fn versionsEqual(a: []const u8, b: []const u8) bool { return std.mem.eql(u8, stripV(a), stripV(b)); } +fn normalizedLaunchModeForUpdate(component: []const u8, launch_mode: []const u8, known: registry.KnownComponent) []const u8 { + if (!std.mem.eql(u8, known.default_launch_command, "gateway") and std.mem.eql(u8, launch_mode, "gateway")) { + return known.default_launch_command; + } + if (std.mem.eql(u8, component, "nullwatch") and std.mem.eql(u8, launch_mode, "nullwatch")) { + return known.default_launch_command; + } + return launch_mode; +} + fn fetchLatestTagForComponent(allocator: std.mem.Allocator, component: []const u8) ?[]u8 { if (builtin.is_test) return null; @@ -197,7 +207,8 @@ pub fn handleApplyUpdateRuntime( const inst_dir = paths.instanceDir(allocator, component, name) catch return serverError(); defer allocator.free(inst_dir); - var launch = launch_args_mod.resolve(allocator, entry.launch_mode, entry.verbose) catch return serverError(); + const launch_mode = normalizedLaunchModeForUpdate(component, entry.launch_mode, known); + var launch = launch_args_mod.resolve(allocator, launch_mode, entry.verbose) catch return serverError(); defer launch.deinit(); const effective_port = launch.effectiveHealthPort(port); @@ -253,7 +264,7 @@ pub fn handleApplyUpdateRuntime( const updated = s.updateInstance(component, name, .{ .version = latest_tag, .auto_start = entry.auto_start, - .launch_mode = entry.launch_mode, + .launch_mode = launch_mode, .verbose = entry.verbose, }) catch return serverError(); if (!updated) return notFound(); @@ -394,6 +405,13 @@ test "handleApplyUpdate returns success for existing instance" { try std.testing.expectEqualStrings("Update initiated", parsed.value.message); } +test "normalizedLaunchModeForUpdate maps legacy nullwatch launch modes to serve" { + const known = registry.findKnownComponent("nullwatch").?; + try std.testing.expectEqualStrings("serve", normalizedLaunchModeForUpdate("nullwatch", "gateway", known)); + try std.testing.expectEqualStrings("serve", normalizedLaunchModeForUpdate("nullwatch", "nullwatch", known)); + try std.testing.expectEqualStrings("serve", normalizedLaunchModeForUpdate("nullwatch", "serve", known)); +} + test "parseUpdatePath extracts component and name correctly" { const p = parseUpdatePath("/api/instances/nullclaw/my-agent/update").?; try std.testing.expectEqualStrings("nullclaw", p.component); diff --git a/src/core/component_cli.zig b/src/core/component_cli.zig index 8cc38ee..b4de226 100644 --- a/src/core/component_cli.zig +++ b/src/core/component_cli.zig @@ -21,6 +21,7 @@ pub fn homeEnvVarForComponent(component_name: []const u8) ?[]const u8 { if (std.mem.eql(u8, component_name, "nullclaw")) return "NULLCLAW_HOME"; if (std.mem.eql(u8, component_name, "nullboiler")) return "NULLBOILER_HOME"; if (std.mem.eql(u8, component_name, "nulltickets")) return "NULLTICKETS_HOME"; + if (std.mem.eql(u8, component_name, "nullwatch")) return "NULLWATCH_HOME"; return null; } @@ -135,3 +136,11 @@ pub fn fromJson( .success = result.success, }; } + +test "home env var includes managed components" { + try std.testing.expectEqualStrings("NULLCLAW_HOME", homeEnvVarForComponent("nullclaw").?); + try std.testing.expectEqualStrings("NULLBOILER_HOME", homeEnvVarForComponent("nullboiler").?); + try std.testing.expectEqualStrings("NULLTICKETS_HOME", homeEnvVarForComponent("nulltickets").?); + try std.testing.expectEqualStrings("NULLWATCH_HOME", homeEnvVarForComponent("nullwatch").?); + try std.testing.expect(homeEnvVarForComponent("unknown") == null); +} diff --git a/src/core/integration.zig b/src/core/integration.zig index a51e827..3ca96de 100644 --- a/src/core/integration.zig +++ b/src/core/integration.zig @@ -9,6 +9,13 @@ pub const NullTicketsConfig = struct { api_token: ?[]const u8 = null, }; +pub const NullWatchConfig = struct { + name: []const u8, + host: []const u8 = "127.0.0.1", + port: u16 = 7710, + api_token: ?[]const u8 = null, +}; + pub const NullBoilerWorkflowConfig = struct { file_name: []const u8, pipeline_id: []const u8, @@ -35,8 +42,23 @@ pub const NullBoilerConfig = struct { tracker: ?NullBoilerTrackerConfig = null, }; +pub const NullClawTelemetryLink = struct { + configured: bool = false, + endpoint: ?[]u8 = null, + service_name: ?[]u8 = null, + auth_configured: bool = false, + source_header_configured: bool = false, + + pub fn deinit(self: *NullClawTelemetryLink, allocator: std.mem.Allocator) void { + if (self.endpoint) |value| allocator.free(value); + if (self.service_name) |value| allocator.free(value); + self.* = .{}; + } +}; + pub fn listNullTickets(allocator: std.mem.Allocator, state: *state_mod.State, paths: paths_mod.Paths) ![]NullTicketsConfig { const names = try state.instanceNames("nulltickets") orelse return allocator.alloc(NullTicketsConfig, 0); + defer state.allocator.free(names); var list: std.ArrayListUnmanaged(NullTicketsConfig) = .empty; errdefer deinitNullTicketsConfigs(allocator, list.items); defer list.deinit(allocator); @@ -52,8 +74,27 @@ pub fn listNullTickets(allocator: std.mem.Allocator, state: *state_mod.State, pa return list.toOwnedSlice(allocator); } +pub fn listNullWatch(allocator: std.mem.Allocator, state: *state_mod.State, paths: paths_mod.Paths) ![]NullWatchConfig { + const names = try state.instanceNames("nullwatch") orelse return allocator.alloc(NullWatchConfig, 0); + defer state.allocator.free(names); + var list: std.ArrayListUnmanaged(NullWatchConfig) = .empty; + errdefer deinitNullWatchConfigs(allocator, list.items); + defer list.deinit(allocator); + + for (names) |name| { + if (try loadNullWatchConfig(allocator, paths, name)) |cfg| { + var owned = cfg; + errdefer deinitNullWatchConfig(allocator, &owned); + try list.append(allocator, owned); + } + } + + return list.toOwnedSlice(allocator); +} + pub fn listNullBoilers(allocator: std.mem.Allocator, state: *state_mod.State, paths: paths_mod.Paths) ![]NullBoilerConfig { const names = try state.instanceNames("nullboiler") orelse return allocator.alloc(NullBoilerConfig, 0); + defer state.allocator.free(names); var list: std.ArrayListUnmanaged(NullBoilerConfig) = .empty; errdefer deinitNullBoilerConfigs(allocator, list.items); defer list.deinit(allocator); @@ -91,6 +132,29 @@ pub fn loadNullTicketsConfig(allocator: std.mem.Allocator, paths: paths_mod.Path }; } +pub fn loadNullWatchConfig(allocator: std.mem.Allocator, paths: paths_mod.Paths, name: []const u8) !?NullWatchConfig { + const config_path = paths.instanceConfig(allocator, "nullwatch", name) catch return null; + defer allocator.free(config_path); + + const file = std_compat.fs.openFileAbsolute(config_path, .{}) catch return null; + defer file.close(); + + const bytes = try file.readToEndAlloc(allocator, 1024 * 1024); + defer allocator.free(bytes); + const parsed = std.json.parseFromSlice(NullWatchConfigFile, allocator, bytes, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return null; + defer parsed.deinit(); + + return .{ + .name = try allocator.dupe(u8, name), + .host = try allocator.dupe(u8, parsed.value.host), + .port = parsed.value.port, + .api_token = if (parsed.value.api_token) |token| try allocator.dupe(u8, token) else null, + }; +} + pub fn loadNullBoilerConfig(allocator: std.mem.Allocator, paths: paths_mod.Paths, name: []const u8) !?NullBoilerConfig { const config_path = paths.instanceConfig(allocator, "nullboiler", name) catch return null; defer allocator.free(config_path); @@ -138,6 +202,18 @@ pub fn deinitNullTicketsConfigs(allocator: std.mem.Allocator, configs: []NullTic allocator.free(configs); } +pub fn deinitNullWatchConfig(allocator: std.mem.Allocator, cfg: *NullWatchConfig) void { + allocator.free(cfg.name); + allocator.free(cfg.host); + if (cfg.api_token) |token| allocator.free(token); + cfg.* = undefined; +} + +pub fn deinitNullWatchConfigs(allocator: std.mem.Allocator, configs: []NullWatchConfig) void { + for (configs) |*cfg| deinitNullWatchConfig(allocator, cfg); + allocator.free(configs); +} + pub fn deinitNullBoilerConfig(allocator: std.mem.Allocator, cfg: *NullBoilerConfig) void { allocator.free(cfg.name); if (cfg.api_token) |token| allocator.free(token); @@ -181,6 +257,107 @@ pub fn countLinkedBoilersForTickets(tickets_cfg: NullTicketsConfig, boilers: []c return count; } +pub fn loadNullClawTelemetryLink(allocator: std.mem.Allocator, paths: paths_mod.Paths, name: []const u8) !NullClawTelemetryLink { + const config_path = try paths.instanceConfig(allocator, "nullclaw", name); + defer allocator.free(config_path); + + const file = std_compat.fs.openFileAbsolute(config_path, .{}) catch |err| switch (err) { + error.FileNotFound => return error.NotFound, + else => return err, + }; + defer file.close(); + + const bytes = try file.readToEndAlloc(allocator, 1024 * 1024); + defer allocator.free(bytes); + + var parsed = try std.json.parseFromSlice(std.json.Value, allocator, bytes, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + + return try parseNullClawTelemetryLink(allocator, parsed.value); +} + +pub fn linkNullClawToNullWatch( + allocator: std.mem.Allocator, + paths: paths_mod.Paths, + claw_name: []const u8, + watch_cfg: NullWatchConfig, +) !void { + const config_path = try paths.instanceConfig(allocator, "nullclaw", claw_name); + defer allocator.free(config_path); + + const file = std_compat.fs.openFileAbsolute(config_path, .{}) catch |err| switch (err) { + error.FileNotFound => return error.NotFound, + else => return err, + }; + defer file.close(); + + const config_bytes = try file.readToEndAlloc(allocator, 1024 * 1024); + defer allocator.free(config_bytes); + + var parsed_config = try std.json.parseFromSlice(std.json.Value, allocator, config_bytes, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed_config.deinit(); + if (parsed_config.value != .object) return error.InvalidConfig; + + const diagnostics_map = try ensureObjectField(allocator, &parsed_config.value.object, "diagnostics"); + try diagnostics_map.put(allocator, "backend", .{ .string = "otel" }); + + const otel_map = try ensureObjectField(allocator, diagnostics_map, "otel"); + const endpoint = try buildNullWatchEndpoint(allocator, watch_cfg); + try otel_map.put(allocator, "endpoint", .{ .string = endpoint }); + + const should_default_service = blk: { + const service_name = jsonString(otel_map.*, "service_name") orelse break :blk true; + break :blk service_name.len == 0 or std.mem.eql(u8, service_name, "nullclaw"); + }; + if (should_default_service) { + const service_name = try std.fmt.allocPrint(allocator, "nullclaw/{s}", .{claw_name}); + try otel_map.put(allocator, "service_name", .{ .string = service_name }); + } + + const headers_map = try ensureObjectField(allocator, otel_map, "headers"); + try headers_map.put(allocator, "x-nullwatch-source", .{ .string = "nullclaw" }); + if (watch_cfg.api_token) |token| { + const auth_header = try std.fmt.allocPrint(allocator, "Bearer {s}", .{token}); + try headers_map.put(allocator, "Authorization", .{ .string = auth_header }); + } else { + _ = headers_map.swapRemove("Authorization"); + } + + const rendered = try std.json.Stringify.valueAlloc(allocator, parsed_config.value, .{ + .whitespace = .indent_2, + .emit_null_optional_fields = false, + }); + defer allocator.free(rendered); + + const out = try std_compat.fs.createFileAbsolute(config_path, .{ .truncate = true }); + defer out.close(); + try out.writeAll(rendered); + try out.writeAll("\n"); +} + +pub fn findNullWatchByEndpoint(watches: []const NullWatchConfig, endpoint: ?[]const u8) ?NullWatchConfig { + const value = endpoint orelse return null; + const port = nullWatchEndpointPort(value) orelse return null; + for (watches) |watch| { + if (watch.port == port) return watch; + } + return null; +} + +pub fn buildNullWatchEndpoint(allocator: std.mem.Allocator, watch: NullWatchConfig) ![]u8 { + const host = normalizedConnectHost(watch.host); + if (std.mem.indexOfScalar(u8, host, ':') != null and !std.mem.startsWith(u8, host, "[")) { + return std.fmt.allocPrint(allocator, "http://[{s}]:{d}", .{ host, watch.port }); + } + return std.fmt.allocPrint(allocator, "http://{s}:{d}", .{ host, watch.port }); +} + pub fn extractLocalPort(url: []const u8) ?u16 { const uri = std.Uri.parse(url) catch return null; const host = uri.host orelse return null; @@ -192,6 +369,66 @@ pub fn extractLocalPort(url: []const u8) ?u16 { }; } +fn parseNullClawTelemetryLink(allocator: std.mem.Allocator, config: std.json.Value) !NullClawTelemetryLink { + const diagnostics = diagnosticsObject(config) orelse return .{}; + const backend = jsonString(diagnostics, "backend") orelse ""; + const backend_configured = std.mem.eql(u8, backend, "otel") or std.mem.eql(u8, backend, "otlp"); + + var endpoint: ?[]const u8 = null; + var service_name: ?[]const u8 = null; + if (objectField(diagnostics, "otel")) |otel| { + endpoint = jsonString(otel, "endpoint"); + service_name = jsonString(otel, "service_name"); + } + + const headers = telemetryHeadersObject(diagnostics); + const auth_configured = if (headers) |map| jsonString(map, "Authorization") != null else false; + const source_header_configured = if (headers) |map| jsonString(map, "x-nullwatch-source") != null else false; + + return .{ + .configured = backend_configured and endpoint != null, + .endpoint = if (endpoint) |value| try allocator.dupe(u8, value) else null, + .service_name = if (service_name) |value| try allocator.dupe(u8, value) else null, + .auth_configured = auth_configured, + .source_header_configured = source_header_configured, + }; +} + +fn objectField(obj: std.json.ObjectMap, key: []const u8) ?std.json.ObjectMap { + const value = obj.get(key) orelse return null; + return if (value == .object) value.object else null; +} + +fn diagnosticsObject(config: std.json.Value) ?std.json.ObjectMap { + if (config != .object) return null; + return objectField(config.object, "diagnostics"); +} + +fn telemetryHeadersObject(diagnostics: std.json.ObjectMap) ?std.json.ObjectMap { + if (objectField(diagnostics, "otel")) |otel| { + if (objectField(otel, "headers")) |headers| return headers; + } + return null; +} + +fn nullWatchEndpointPort(endpoint: []const u8) ?u16 { + if (extractLocalPort(endpoint)) |port| return port; + const uri = std.Uri.parse(endpoint) catch return null; + return uri.port; +} + +fn normalizedConnectHost(host: []const u8) []const u8 { + if (host.len == 0 or + std.mem.eql(u8, host, "0.0.0.0") or + std.mem.eql(u8, host, "::") or + std.mem.eql(u8, host, "[::]") or + std.mem.eql(u8, host, "localhost")) + { + return "127.0.0.1"; + } + return host; +} + fn isLocalHost(host: []const u8) bool { return std.mem.eql(u8, host, "127.0.0.1") or std.mem.eql(u8, host, "localhost") or @@ -199,6 +436,27 @@ fn isLocalHost(host: []const u8) bool { std.mem.eql(u8, host, "::1"); } +fn jsonString(obj: std.json.ObjectMap, key: []const u8) ?[]const u8 { + const value = obj.get(key) orelse return null; + return if (value == .string) value.string else null; +} + +fn ensureObjectField( + allocator: std.mem.Allocator, + parent: *std.json.ObjectMap, + key: []const u8, +) !*std.json.ObjectMap { + if (parent.getPtr(key)) |value_ptr| { + if (value_ptr.* != .object) { + value_ptr.* = .{ .object = .empty }; + } + return &value_ptr.object; + } + + try parent.put(allocator, key, .{ .object = .empty }); + return &parent.getPtr(key).?.object; +} + fn loadPrimaryWorkflowConfig(allocator: std.mem.Allocator, workflows_dir: []const u8) !?NullBoilerWorkflowConfig { var dir = std_compat.fs.openDirAbsolute(workflows_dir, .{ .iterate = true }) catch return null; defer dir.close(); @@ -259,6 +517,12 @@ const NullTicketsConfigFile = struct { api_token: ?[]const u8 = null, }; +const NullWatchConfigFile = struct { + host: []const u8 = "127.0.0.1", + port: u16 = 7710, + api_token: ?[]const u8 = null, +}; + const NullBoilerConfigFile = struct { port: u16 = 8080, api_token: ?[]const u8 = null, diff --git a/src/core/launch_args.zig b/src/core/launch_args.zig index b1b067a..b800983 100644 --- a/src/core/launch_args.zig +++ b/src/core/launch_args.zig @@ -57,6 +57,29 @@ pub fn buildLaunchArgs( return resolved.argv; } +pub fn fromManifestLaunch( + allocator: std.mem.Allocator, + component: []const u8, + command: []const u8, + args: []const []const u8, +) ![]const u8 { + var tokens = std.array_list.Managed([]const u8).init(allocator); + defer tokens.deinit(); + + if (!isBinaryCommand(component, command) and command.len > 0) { + try tokens.append(command); + } + for (args) |arg| try tokens.append(arg); + if (tokens.items.len == 0) return error.InvalidLaunchMode; + + var buf = std.array_list.Managed(u8).init(allocator); + errdefer buf.deinit(); + for (tokens.items, 0..) |token, index| { + try appendLaunchModeToken(&buf, token, index == 0); + } + return buf.toOwnedSlice(); +} + pub fn freeOwnedArgv(allocator: std.mem.Allocator, argv: []const []const u8) void { if (argv.len == 0) return; for (argv) |arg| allocator.free(arg); @@ -130,6 +153,37 @@ fn parseLaunchMode(allocator: std.mem.Allocator, launch_mode: []const u8) !std.A return list; } +fn isBinaryCommand(component: []const u8, command: []const u8) bool { + const base = std.fs.path.basename(command); + if (std.mem.eql(u8, base, component)) return true; + return std.mem.endsWith(u8, base, ".exe") and + std.mem.eql(u8, base[0 .. base.len - ".exe".len], component); +} + +fn appendLaunchModeToken(buf: *std.array_list.Managed(u8), token: []const u8, first: bool) !void { + if (!first) try buf.append(' '); + + var needs_quote = token.len == 0; + for (token) |ch| { + if (std.ascii.isWhitespace(ch) or ch == '"' or ch == '\\') { + needs_quote = true; + break; + } + } + + if (!needs_quote) { + try buf.appendSlice(token); + return; + } + + try buf.append('"'); + for (token) |ch| { + if (ch == '"' or ch == '\\') try buf.append('\\'); + try buf.append(ch); + } + try buf.append('"'); +} + fn appendOwnedToken( allocator: std.mem.Allocator, list: *std.ArrayListUnmanaged([]const u8), @@ -188,6 +242,28 @@ test "buildLaunchArgs preserves tokenized launch mode when verbose disabled" { try std.testing.expectEqualStrings("bar", args[2]); } +test "fromManifestLaunch drops binary command and keeps launch args" { + const allocator = std.testing.allocator; + const mode = try fromManifestLaunch(allocator, "nullwatch", "nullwatch", &.{"serve"}); + defer allocator.free(mode); + try std.testing.expectEqualStrings("serve", mode); +} + +test "fromManifestLaunch keeps command args and quotes round-trippable tokens" { + const allocator = std.testing.allocator; + const args = [_][]const u8{ "--label", "hello world" }; + const mode = try fromManifestLaunch(allocator, "demo", "serve", &args); + defer allocator.free(mode); + + var resolved = try resolve(allocator, mode, false); + defer resolved.deinit(); + + try std.testing.expectEqual(@as(usize, 3), resolved.argv.len); + try std.testing.expectEqualStrings("serve", resolved.argv[0]); + try std.testing.expectEqualStrings("--label", resolved.argv[1]); + try std.testing.expectEqualStrings("hello world", resolved.argv[2]); +} + test "resolve preserves quoted arguments with spaces" { const allocator = std.testing.allocator; var resolved = try resolve(allocator, "agent --prompt \"hello world\" 'two words'", false); diff --git a/src/installer/orchestrator.zig b/src/installer/orchestrator.zig index 0fd649f..d9b276f 100644 --- a/src/installer/orchestrator.zig +++ b/src/installer/orchestrator.zig @@ -195,7 +195,17 @@ pub fn install( } else |_| {} // Use parsed manifest values or fall back to registry defaults - const launch_command = if (parsed_manifest) |pm| pm.value.launch.command else comp.default_launch_command; + var owned_launch_command: ?[]const u8 = null; + defer if (owned_launch_command) |value| allocator.free(value); + const launch_command = if (parsed_manifest) |pm| blk: { + owned_launch_command = launch_args_mod.fromManifestLaunch( + allocator, + opts.component, + pm.value.launch.command, + pm.value.launch.args, + ) catch null; + break :blk owned_launch_command orelse comp.default_launch_command; + } else comp.default_launch_command; const health_endpoint = if (parsed_manifest) |pm| pm.value.health.endpoint else comp.default_health_endpoint; const default_port = if (parsed_manifest) |pm| (if (pm.value.ports.len > 0) pm.value.ports[0].default else comp.default_port) else comp.default_port; defer if (parsed_manifest) |pm| pm.deinit(); @@ -365,28 +375,58 @@ fn resolveConfiguredPort( paths: paths_mod.Paths, state: *state_mod.State, ) u16 { - const parsed = std.json.parseFromSlice( - struct { - port: ?u16 = null, - gateway_port: ?u16 = null, - answers: ?struct { - port: ?u16 = null, - gateway_port: ?u16 = null, - } = null, - }, - allocator, - answers_json, - .{ .allocate = .alloc_if_needed, .ignore_unknown_fields = true }, - ) catch return findNextAvailablePort(allocator, default_port, paths, state); + const requested = resolveRequestedPort(allocator, answers_json) orelse + return findNextAvailablePort(allocator, default_port, paths, state); + + if (requested == default_port) { + return findNextAvailablePort(allocator, default_port, paths, state); + } + + var used_ports = collectConfiguredPorts(allocator, paths, state) catch return if (isPortFree(requested)) + requested + else + findFreePort(requested); + defer used_ports.deinit(); + if (used_ports.contains(requested) or !isPortFree(requested)) { + return findNextAvailablePort(allocator, requested, paths, state); + } + return requested; +} + +fn resolveRequestedPort(allocator: std.mem.Allocator, answers_json: []const u8) ?u16 { + const parsed = std.json.parseFromSlice(std.json.Value, allocator, answers_json, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return null; defer parsed.deinit(); + if (parsed.value != .object) return null; - if (parsed.value.port) |v| return v; - if (parsed.value.gateway_port) |v| return v; - if (parsed.value.answers) |a| { - if (a.port) |v| return v; - if (a.gateway_port) |v| return v; + if (parsed.value.object.get("port")) |value| { + if (parsePortValue(value)) |port| return port; + } + if (parsed.value.object.get("gateway_port")) |value| { + if (parsePortValue(value)) |port| return port; + } + if (parsed.value.object.get("answers")) |answers| { + if (answers == .object) { + if (answers.object.get("port")) |value| { + if (parsePortValue(value)) |port| return port; + } + if (answers.object.get("gateway_port")) |value| { + if (parsePortValue(value)) |port| return port; + } + } } - return findNextAvailablePort(allocator, default_port, paths, state); + return null; +} + +fn parsePortValue(value: std.json.Value) ?u16 { + return switch (value) { + .integer => |raw| if (raw >= 0 and raw <= 65535) @intCast(raw) else null, + .number_string => |raw| std.fmt.parseInt(u16, raw, 10) catch null, + .string => |raw| std.fmt.parseInt(u16, raw, 10) catch null, + else => null, + }; } fn persistAndStartInstance( @@ -567,10 +607,14 @@ fn readPortFromConfigPath(allocator: std.mem.Allocator, config_path: []const u8, } } - return switch (current) { - .integer => |value| if (value >= 0 and value <= 65535) @intCast(value) else null, - else => null, - }; + return parsePortValue(current); +} + +fn shouldWritePortField(value: ?std.json.Value, port: u16, overwrite: bool) bool { + if (overwrite) return true; + const existing = value orelse return true; + const existing_port = parsePortValue(existing) orelse return true; + return existing_port != port; } fn injectPortFields( @@ -587,15 +631,21 @@ fn injectPortFields( if (parsed.value != .object) return error.InvalidJson; var root = &parsed.value.object; - if (overwrite or root.get("port") == null) { + if (shouldWritePortField(root.get("port"), port, overwrite)) { try root.put(allocator, "port", .{ .integer = @as(i64, port) }); } - if (overwrite or root.get("gateway_port") == null) { + if (shouldWritePortField(root.get("gateway_port"), port, overwrite)) { try root.put(allocator, "gateway_port", .{ .integer = @as(i64, port) }); } if (root.getPtr("gateway")) |gateway_value| { - if (gateway_value.* == .object and (overwrite or gateway_value.object.get("port") == null)) { - try gateway_value.object.put(allocator, "port", .{ .integer = @as(i64, port) }); + if (gateway_value.* == .object) { + if (shouldWritePortField(gateway_value.object.get("port"), port, overwrite)) { + try gateway_value.object.put(allocator, "port", .{ .integer = @as(i64, port) }); + } + } else if (overwrite) { + var gateway_obj: std.json.ObjectMap = .empty; + try gateway_obj.put(allocator, "port", .{ .integer = @as(i64, port) }); + gateway_value.* = .{ .object = gateway_obj }; } } else { var gateway_obj: std.json.ObjectMap = .empty; @@ -1203,6 +1253,19 @@ test "resolveConfiguredPort reads top-level port" { try std.testing.expectEqual(@as(u16, 9001), port); } +test "resolveConfiguredPort reads string port" { + const allocator = std.testing.allocator; + var fixture = try test_helpers.TempPaths.init(allocator); + defer fixture.deinit(); + const state_path = try fixture.paths.state(allocator); + defer allocator.free(state_path); + var state = state_mod.State.init(allocator, state_path); + defer state.deinit(); + + const port = resolveConfiguredPort(allocator, "{\"port\":\"9002\"}", 8080, fixture.paths, &state); + try std.testing.expectEqual(@as(u16, 9002), port); +} + test "resolveConfiguredPort reads nested answers port" { const allocator = std.testing.allocator; var fixture = try test_helpers.TempPaths.init(allocator); @@ -1253,6 +1316,80 @@ test "resolveConfiguredPort skips configured instance ports" { try std.testing.expect(port > 43000); } +test "resolveConfiguredPort skips configured string default port" { + const allocator = std.testing.allocator; + var fixture = try test_helpers.TempPaths.init(allocator); + defer fixture.deinit(); + try fixture.paths.ensureDirs(); + + const state_path = try fixture.paths.state(allocator); + defer allocator.free(state_path); + var state = state_mod.State.init(allocator, state_path); + defer state.deinit(); + try state.addInstance("nullwatch", "watch-1", .{ + .version = "v2026.3.8", + .auto_start = true, + .launch_mode = "serve", + }); + + const comp_dir = try std.fs.path.join(allocator, &.{ fixture.paths.root, "instances", "nullwatch" }); + defer allocator.free(comp_dir); + std_compat.fs.makeDirAbsolute(comp_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + const inst_dir = try fixture.paths.instanceDir(allocator, "nullwatch", "watch-1"); + defer allocator.free(inst_dir); + std_compat.fs.makeDirAbsolute(inst_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const config_path = try fixture.paths.instanceConfig(allocator, "nullwatch", "watch-1"); + defer allocator.free(config_path); + try writeFile(config_path, "{\"port\":\"43010\"}"); + + const port = resolveConfiguredPort(allocator, "{\"port\":\"43010\"}", 43010, fixture.paths, &state); + try std.testing.expect(port > 43010); +} + +test "resolveConfiguredPort skips configured explicit non-default port" { + const allocator = std.testing.allocator; + var fixture = try test_helpers.TempPaths.init(allocator); + defer fixture.deinit(); + try fixture.paths.ensureDirs(); + + const state_path = try fixture.paths.state(allocator); + defer allocator.free(state_path); + var state = state_mod.State.init(allocator, state_path); + defer state.deinit(); + try state.addInstance("nullwatch", "watch-1", .{ + .version = "v2026.3.8", + .auto_start = true, + .launch_mode = "serve", + }); + + const comp_dir = try std.fs.path.join(allocator, &.{ fixture.paths.root, "instances", "nullwatch" }); + defer allocator.free(comp_dir); + std_compat.fs.makeDirAbsolute(comp_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + const inst_dir = try fixture.paths.instanceDir(allocator, "nullwatch", "watch-1"); + defer allocator.free(inst_dir); + std_compat.fs.makeDirAbsolute(inst_dir) catch |err| switch (err) { + error.PathAlreadyExists => {}, + else => return err, + }; + + const config_path = try fixture.paths.instanceConfig(allocator, "nullwatch", "watch-1"); + defer allocator.free(config_path); + try writeFile(config_path, "{\"port\":43020}"); + + const port = resolveConfiguredPort(allocator, "{\"port\":43020}", 7710, fixture.paths, &state); + try std.testing.expect(port > 43020); +} + test "injectPortFields fills missing port fields" { const allocator = std.testing.allocator; const rendered = try injectPortFields(allocator, "{\"instance_name\":\"instance-2\"}", 3002, false); @@ -1269,6 +1406,27 @@ test "injectPortFields fills missing port fields" { try std.testing.expectEqual(@as(i64, 3002), parsed.value.object.get("gateway").?.object.get("port").?.integer); } +test "injectPortFields overwrites stale string port fields" { + const allocator = std.testing.allocator; + const rendered = try injectPortFields( + allocator, + "{\"instance_name\":\"instance-2\",\"port\":\"3000\",\"gateway_port\":\"3000\",\"gateway\":{\"port\":\"3000\"}}", + 3002, + false, + ); + defer allocator.free(rendered); + + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, rendered, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }); + defer parsed.deinit(); + + try std.testing.expectEqual(@as(i64, 3002), parsed.value.object.get("port").?.integer); + try std.testing.expectEqual(@as(i64, 3002), parsed.value.object.get("gateway_port").?.integer); + try std.testing.expectEqual(@as(i64, 3002), parsed.value.object.get("gateway").?.object.get("port").?.integer); +} + test "injectPortFields overwrites existing port fields when requested" { const allocator = std.testing.allocator; const rendered = try injectPortFields( diff --git a/src/installer/registry.zig b/src/installer/registry.zig index fd124b6..f795216 100644 --- a/src/installer/registry.zig +++ b/src/installer/registry.zig @@ -48,6 +48,14 @@ pub const known_components = [_]KnownComponent{ .repo = "nullclaw/nulltickets", .is_alpha = true, }, + .{ + .name = "nullwatch", + .display_name = "NullWatch", + .description = "Headless observability, tracing, evals, and run intelligence for lightweight agent infrastructure.", + .repo = "nullclaw/nullwatch", + .default_launch_command = "serve", + .default_port = 7710, + }, }; /// Look up a component by name in the known_components list. @@ -238,6 +246,14 @@ test "findKnownComponent returns nullboiler" { try std.testing.expectEqualStrings("nullclaw/NullBoiler", comp.?.repo); } +test "findKnownComponent returns nullwatch" { + const comp = findKnownComponent("nullwatch"); + try std.testing.expect(comp != null); + try std.testing.expectEqualStrings("nullclaw/nullwatch", comp.?.repo); + try std.testing.expectEqualStrings("serve", comp.?.default_launch_command); + try std.testing.expectEqual(@as(u16, 7710), comp.?.default_port); +} + test "findKnownComponent returns null for unknown" { try std.testing.expect(findKnownComponent("nonexistent") == null); } diff --git a/src/root.zig b/src/root.zig index b98fa0a..aeb7966 100644 --- a/src/root.zig +++ b/src/root.zig @@ -17,6 +17,7 @@ pub const manager = @import("supervisor/manager.zig"); pub const managed_skills = @import("managed_skills.zig"); pub const meta_api = @import("api/meta.zig"); pub const mdns = @import("mdns.zig"); +pub const observability_api = @import("api/observability.zig"); pub const orchestrator = @import("installer/orchestrator.zig"); pub const manifest = @import("core/manifest.zig"); pub const paths = @import("core/paths.zig"); @@ -64,6 +65,7 @@ test { _ = managed_skills; _ = meta_api; _ = mdns; + _ = observability_api; _ = orchestrator; _ = manifest; _ = paths; diff --git a/src/server.zig b/src/server.zig index 954fd8b..97ce8e6 100644 --- a/src/server.zig +++ b/src/server.zig @@ -14,16 +14,19 @@ const updates_api = @import("api/updates.zig"); const access = @import("access.zig"); const mdns_mod = @import("mdns.zig"); const state_mod = @import("core/state.zig"); +const integration_mod = @import("core/integration.zig"); const paths_mod = @import("core/paths.zig"); const manager_mod = @import("supervisor/manager.zig"); const process_mod = @import("supervisor/process.zig"); const runtime_state_mod = @import("supervisor/runtime_state.zig"); +const instance_runtime = @import("api/instance_runtime.zig"); const wizard_api = @import("api/wizard.zig"); const providers_api = @import("api/providers.zig"); const channels_api = @import("api/channels.zig"); const usage_api = @import("api/usage.zig"); const report_api = @import("api/report.zig"); const orchestration_api = @import("api/orchestration.zig"); +const observability_api = @import("api/observability.zig"); const launch_args_mod = @import("core/launch_args.zig"); const ui_modules = @import("installer/ui_modules.zig"); const orchestrator = @import("installer/orchestrator.zig"); @@ -156,12 +159,23 @@ pub const Server = struct { }; defer self.allocator.free(desired_binary); - var desired_launch = launch_args_mod.resolve(self.allocator, entry.launch_mode, entry.verbose) catch { + const launch_mode = normalizedLaunchModeForRestore(component, entry.launch_mode); + var desired_launch = launch_args_mod.resolve(self.allocator, launch_mode, entry.verbose) catch { self.terminatePersistedRuntime(&runtime, component, name); return false; }; defer desired_launch.deinit(); + if (!std.mem.eql(u8, launch_mode, entry.launch_mode)) { + _ = self.state.updateInstance(component, name, .{ + .version = entry.version, + .auto_start = entry.auto_start, + .launch_mode = launch_mode, + .verbose = entry.verbose, + }) catch {}; + self.state.save() catch {}; + } + if (!persistedMatchesDesired(runtime, desired_binary, desired_launch.primary_command, desired_launch.argv)) { self.terminatePersistedRuntime(&runtime, component, name); return false; @@ -172,6 +186,17 @@ pub const Server = struct { return restored; } + fn normalizedLaunchModeForRestore(component: []const u8, launch_mode: []const u8) []const u8 { + const known = registry.findKnownComponent(component) orelse return launch_mode; + if (!std.mem.eql(u8, known.default_launch_command, "gateway") and std.mem.eql(u8, launch_mode, "gateway")) { + return known.default_launch_command; + } + if (std.mem.eql(u8, component, "nullwatch") and std.mem.eql(u8, launch_mode, "nullwatch")) { + return known.default_launch_command; + } + return launch_mode; + } + fn terminatePersistedRuntime( self: *Server, runtime: *runtime_state_mod.PersistedRuntime, @@ -524,6 +549,10 @@ pub const Server = struct { "NULLTICKETS_URL" else if (std.mem.eql(u8, name, "NULLTICKETS_TOKEN")) "NULLTICKETS_TOKEN" + else if (std.mem.eql(u8, name, "NULLWATCH_URL")) + "NULLWATCH_URL" + else if (std.mem.eql(u8, name, "NULLWATCH_TOKEN")) + "NULLWATCH_TOKEN" else return null; return if (std.c.getenv(name_z)) |value| std.mem.span(value) else null; @@ -549,8 +578,132 @@ pub const Server = struct { return getEnv("NULLTICKETS_TOKEN"); } + const WatchTarget = struct { + url: ?[]const u8 = null, + url_owned: bool = false, + token: ?[]const u8 = null, + token_owned: bool = false, + + fn deinit(self: WatchTarget, allocator: std.mem.Allocator) void { + if (self.url_owned) if (self.url) |value| allocator.free(value); + if (self.token_owned) if (self.token) |value| allocator.free(value); + } + }; + + const WatchCandidate = struct { + name: []const u8, + port: u16, + }; + + const WatchCandidateSelection = struct { + running: ?WatchCandidate = null, + starting: ?WatchCandidate = null, + selected: ?WatchCandidate = null, + + fn prefer(current: ?WatchCandidate, next: WatchCandidate) WatchCandidate { + const existing = current orelse return next; + return if (std.mem.order(u8, next.name, existing.name) == .lt) next else existing; + } + + fn add(self: *@This(), selected_name: ?[]const u8, candidate: WatchCandidate, status: manager_mod.Status) void { + if (candidate.port == 0) return; + + switch (status) { + .running => { + if (selected_name) |name| { + if (std.mem.eql(u8, name, candidate.name)) self.selected = candidate; + } + self.running = prefer(self.running, candidate); + }, + .starting, .restarting => { + if (selected_name) |name| { + if (std.mem.eql(u8, name, candidate.name)) self.selected = candidate; + } + self.starting = prefer(self.starting, candidate); + }, + .stopped, .stopping, .failed => {}, + } + } + }; + + fn getWatchTarget(self: *Server, allocator: std.mem.Allocator, selected_name: ?[]const u8) WatchTarget { + const env_token = getEnv("NULLWATCH_TOKEN"); + if (selected_name == null) { + if (getEnv("NULLWATCH_URL")) |url| return .{ .url = url, .token = env_token }; + } + return self.getManagedWatchTarget(allocator, env_token, selected_name) catch .{ .token = env_token }; + } + + fn getManagedWatchTarget(self: *Server, allocator: std.mem.Allocator, token_override: ?[]const u8, selected_name: ?[]const u8) !WatchTarget { + var candidates = WatchCandidateSelection{}; + + if (self.state.instances.getPtr("nullwatch")) |watch_instances| { + var state_it = watch_instances.iterator(); + while (state_it.next()) |entry| { + const snapshot = instance_runtime.resolve( + allocator, + self.paths, + self.manager, + "nullwatch", + entry.key_ptr.*, + entry.value_ptr.*, + ); + candidates.add(selected_name, .{ .name = entry.key_ptr.*, .port = snapshot.port }, snapshot.status); + } + } + + var manager_it = self.manager.instances.iterator(); + while (manager_it.next()) |entry| { + const inst = entry.value_ptr.*; + if (!std.mem.eql(u8, inst.component, "nullwatch")) continue; + candidates.add(selected_name, .{ .name = inst.name, .port = inst.port }, inst.status); + } + + if (selected_name != null) { + if (candidates.selected) |candidate| { + return try self.buildManagedWatchTarget(allocator, candidate.name, candidate.port, token_override); + } + return .{ .token = token_override }; + } + if (candidates.running) |candidate| { + return try self.buildManagedWatchTarget(allocator, candidate.name, candidate.port, token_override); + } + if (candidates.starting) |candidate| { + return try self.buildManagedWatchTarget(allocator, candidate.name, candidate.port, token_override); + } + return .{ .token = token_override }; + } + + fn buildManagedWatchTarget(self: *Server, allocator: std.mem.Allocator, name: []const u8, port: u16, token_override: ?[]const u8) !WatchTarget { + var cfg = (try integration_mod.loadNullWatchConfig(allocator, self.paths, name)) orelse blk: { + const cfg_name = try allocator.dupe(u8, name); + errdefer allocator.free(cfg_name); + const cfg_host = try allocator.dupe(u8, "127.0.0.1"); + break :blk integration_mod.NullWatchConfig{ + .name = cfg_name, + .host = cfg_host, + }; + }; + defer integration_mod.deinitNullWatchConfig(allocator, &cfg); + cfg.port = port; + + var target = WatchTarget{}; + errdefer target.deinit(allocator); + target.url = try integration_mod.buildNullWatchEndpoint(allocator, cfg); + target.url_owned = true; + if (token_override) |token| { + target.token = token; + } else if (cfg.api_token) |token| { + target.token = try allocator.dupe(u8, token); + target.token_owned = true; + } + return target; + } + fn routeWithoutServerMutex(target: []const u8) bool { - return instances_api.isIntegrationPath(target) or orchestration_api.isProxyPath(target); + return instances_api.isIntegrationPath(target) or + orchestration_api.isProxyPath(target) or + observability_api.isProxyPath(target); } fn route(self: *Server, allocator: std.mem.Allocator, method: []const u8, target: []const u8, body: []const u8) Response { @@ -1128,6 +1281,25 @@ pub const Server = struct { return .{ .status = resp.status, .content_type = resp.content_type, .body = resp.body }; } + if (observability_api.isProxyPath(target)) { + const selected_watch = observability_api.selectedWatchNameAlloc(allocator, target) catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + defer if (selected_watch) |value| allocator.free(value); + + const watch_target = blk: { + self.mutex.lock(); + defer self.mutex.unlock(); + break :blk self.getWatchTarget(allocator, selected_watch); + }; + defer watch_target.deinit(allocator); + + const resp = observability_api.handle(allocator, method, target, body, .{ + .watch_url = watch_target.url, + .watch_token = watch_target.token, + }); + return .{ .status = resp.status, .content_type = resp.content_type, .body = resp.body }; + } + // Serve UI module files from data directory (~/.nullhub/ui/{name}@{version}/...) if (!auth.isApiPath(target) and std.mem.startsWith(u8, target, "/ui/")) { // Check if this looks like a module path: /ui/{name}@{version}/... @@ -1628,6 +1800,54 @@ test "reconcileInstancesOnBoot terminates mismatched persisted runtime without r try std.testing.expectEqualStrings("started\n", contents); } +test "reconcileInstancesOnBoot adopts legacy nullwatch launch mode as serve" { + const builtin = @import("builtin"); + if (comptime builtin.os.tag == .windows) return error.SkipZigTest; + + const allocator = std.testing.allocator; + var ctx = TestContext.init(allocator); + defer ctx.deinit(allocator); + try ctx.paths.ensureDirs(); + + const binary_path = try ctx.paths.binary(allocator, "nullwatch", "1.0.0"); + defer allocator.free(binary_path); + + try ctx.state.addInstance("nullwatch", "watch", .{ + .version = "1.0.0", + .auto_start = false, + .launch_mode = "gateway", + }); + + var launch = try launch_args_mod.resolve(allocator, "serve", false); + defer launch.deinit(); + + const spawned = try process_mod.spawn(allocator, .{ + .binary = "/bin/sleep", + .argv = &.{"60"}, + }); + + try runtime_state_mod.write(allocator, ctx.paths, "nullwatch", "watch", .{ + .pid = process_mod.persistedPidValue(spawned.pid).?, + .port = 0, + .health_endpoint = "/health", + .binary_path = binary_path, + .launch_command = launch.primary_command, + .launch_args = launch.argv, + .started_at = std_compat.time.milliTimestamp(), + .starting_since = std_compat.time.milliTimestamp(), + }); + + ctx.reconcileInstancesOnBoot(); + + const status = ctx.manager.getStatus("nullwatch", "watch").?; + try std.testing.expectEqual(manager_mod.Status.running, status.status); + try std.testing.expect(process_mod.isAlive(spawned.pid)); + try std.testing.expectEqualStrings("serve", ctx.state.getInstance("nullwatch", "watch").?.launch_mode); + + ctx.manager.stopInstance("nullwatch", "watch") catch {}; + _ = spawned.child.wait() catch {}; +} + test "route GET /api/status returns version and platform" { var ctx = TestContext.init(std.testing.allocator); defer ctx.deinit(std.testing.allocator); @@ -1853,10 +2073,141 @@ test "routeWithoutServerMutex keeps orchestration proxy requests off global lock try std.testing.expect(Server.routeWithoutServerMutex("/api/orchestration")); try std.testing.expect(Server.routeWithoutServerMutex("/api/orchestration/runs")); try std.testing.expect(Server.routeWithoutServerMutex("/api/orchestration/store/search")); + try std.testing.expect(Server.routeWithoutServerMutex("/api/observability/v1/runs")); try std.testing.expect(Server.routeWithoutServerMutex("/api/instances/nullclaw/demo/logs")); try std.testing.expect(!Server.routeWithoutServerMutex("/api/components")); } +test "managed NullWatch target is discovered from supervisor state" { + const allocator = std.testing.allocator; + var ctx = TestContext.init(allocator); + defer ctx.deinit(allocator); + + const key = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ "nullwatch", "watch" }); + try ctx.manager.instances.put(key, .{ + .component = "nullwatch", + .name = "watch", + .status = .running, + .port = 7710, + }); + + const target = try ctx.server.getManagedWatchTarget(allocator, null, null); + defer target.deinit(allocator); + try std.testing.expect(target.url != null); + try std.testing.expectEqualStrings("http://127.0.0.1:7710", target.url.?); + try std.testing.expect(target.token == null); +} + +test "managed NullWatch target prefers first running instance by name" { + const allocator = std.testing.allocator; + var ctx = TestContext.init(allocator); + defer ctx.deinit(allocator); + + const key_z = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ "nullwatch", "zulu" }); + try ctx.manager.instances.put(key_z, .{ + .component = "nullwatch", + .name = "zulu", + .status = .running, + .port = 7712, + }); + const key_a = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ "nullwatch", "alpha" }); + try ctx.manager.instances.put(key_a, .{ + .component = "nullwatch", + .name = "alpha", + .status = .running, + .port = 7711, + }); + + const target = try ctx.server.getManagedWatchTarget(allocator, null, null); + defer target.deinit(allocator); + try std.testing.expectEqualStrings("http://127.0.0.1:7711", target.url.?); +} + +test "managed NullWatch target can select a specific running instance" { + const allocator = std.testing.allocator; + var ctx = TestContext.init(allocator); + defer ctx.deinit(allocator); + + const key_alpha = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ "nullwatch", "alpha" }); + try ctx.manager.instances.put(key_alpha, .{ + .component = "nullwatch", + .name = "alpha", + .status = .running, + .port = 7711, + }); + const key_zulu = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ "nullwatch", "zulu" }); + try ctx.manager.instances.put(key_zulu, .{ + .component = "nullwatch", + .name = "zulu", + .status = .running, + .port = 7712, + }); + + const target = try ctx.server.getManagedWatchTarget(allocator, null, "zulu"); + defer target.deinit(allocator); + try std.testing.expectEqualStrings("http://127.0.0.1:7712", target.url.?); +} + +test "managed NullWatch target reads host and token from config" { + const allocator = std.testing.allocator; + var ctx = TestContext.init(allocator); + defer ctx.deinit(allocator); + try ctx.paths.ensureDirs(); + + const inst_dir = try ctx.paths.instanceDir(allocator, "nullwatch", "watch"); + defer allocator.free(inst_dir); + try std_compat.fs.makeDirAbsolute(inst_dir); + + const config_path = try ctx.paths.instanceConfig(allocator, "nullwatch", "watch"); + defer allocator.free(config_path); + var file = try std_compat.fs.createFileAbsolute(config_path, .{ .truncate = true }); + defer file.close(); + try file.writeAll("{\"host\":\"0.0.0.0\",\"api_token\":\"managed-secret\"}"); + + const key = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ "nullwatch", "watch" }); + try ctx.manager.instances.put(key, .{ + .component = "nullwatch", + .name = "watch", + .status = .running, + .port = 7710, + }); + + const target = try ctx.server.getManagedWatchTarget(allocator, null, null); + defer target.deinit(allocator); + try std.testing.expectEqualStrings("http://127.0.0.1:7710", target.url.?); + try std.testing.expectEqualStrings("managed-secret", target.token.?); +} + +test "managed NullWatch target brackets IPv6 host and lets env token override config" { + const allocator = std.testing.allocator; + var ctx = TestContext.init(allocator); + defer ctx.deinit(allocator); + try ctx.paths.ensureDirs(); + + const inst_dir = try ctx.paths.instanceDir(allocator, "nullwatch", "watch"); + defer allocator.free(inst_dir); + try std_compat.fs.makeDirAbsolute(inst_dir); + + const config_path = try ctx.paths.instanceConfig(allocator, "nullwatch", "watch"); + defer allocator.free(config_path); + var file = try std_compat.fs.createFileAbsolute(config_path, .{ .truncate = true }); + defer file.close(); + try file.writeAll("{\"host\":\"::1\",\"api_token\":\"managed-secret\"}"); + + const key = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ "nullwatch", "watch" }); + try ctx.manager.instances.put(key, .{ + .component = "nullwatch", + .name = "watch", + .status = .running, + .port = 7710, + }); + + const target = try ctx.server.getManagedWatchTarget(allocator, "env-secret", null); + defer target.deinit(allocator); + try std.testing.expectEqualStrings("http://[::1]:7710", target.url.?); + try std.testing.expectEqualStrings("env-secret", target.token.?); +} + test "extractBody returns body after headers" { const raw = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\nhello world"; try std.testing.expectEqualStrings("hello world", extractBody(raw)); diff --git a/src/supervisor/manager.zig b/src/supervisor/manager.zig index 09efa77..6c63ce4 100644 --- a/src/supervisor/manager.zig +++ b/src/supervisor/manager.zig @@ -336,6 +336,68 @@ pub const Manager = struct { self.clearPid(inst); } + const HealthHost = struct { + value: []const u8 = "127.0.0.1", + owned: bool = false, + + fn deinit(self: HealthHost, allocator: std.mem.Allocator) void { + if (self.owned) allocator.free(self.value); + } + }; + + fn normalizeHealthHost(allocator: std.mem.Allocator, host: []const u8) ![]const u8 { + if (host.len == 0 or + std.mem.eql(u8, host, "0.0.0.0") or + std.mem.eql(u8, host, "::") or + std.mem.eql(u8, host, "localhost")) + { + return allocator.dupe(u8, "127.0.0.1"); + } + return allocator.dupe(u8, host); + } + + fn readHealthHostFromConfigPath(self: *Manager, config_path: []const u8) ?[]const u8 { + const file = std_compat.fs.openFileAbsolute(config_path, .{}) catch return null; + defer file.close(); + + const contents = file.readToEndAlloc(self.allocator, 1024 * 1024) catch return null; + defer self.allocator.free(contents); + + const parsed = std.json.parseFromSlice(std.json.Value, self.allocator, contents, .{ + .allocate = .alloc_always, + .ignore_unknown_fields = true, + }) catch return null; + defer parsed.deinit(); + if (parsed.value != .object) return null; + + const host_value = parsed.value.object.get("host") orelse return null; + if (host_value != .string) return null; + return normalizeHealthHost(self.allocator, host_value.string) catch null; + } + + fn instanceConfigPathForHealth(self: *Manager, inst: *const ManagedInstance) ?[]const u8 { + if (inst.config_path.len > 0) { + return self.allocator.dupe(u8, inst.config_path) catch null; + } + if (inst.working_dir.len > 0) { + return std.fs.path.join(self.allocator, &.{ inst.working_dir, "config.json" }) catch null; + } + return null; + } + + fn resolveInstanceHealthHost(self: *Manager, inst: *const ManagedInstance) HealthHost { + const config_path = self.instanceConfigPathForHealth(inst) orelse return .{}; + defer self.allocator.free(config_path); + const host = self.readHealthHostFromConfigPath(config_path) orelse return .{}; + return .{ .value = host, .owned = true }; + } + + fn checkInstanceHealth(self: *Manager, inst: *const ManagedInstance) health.HealthCheckResult { + const host = self.resolveInstanceHealthHost(inst); + defer host.deinit(self.allocator); + return health.check(self.allocator, host.value, inst.port, inst.health_endpoint); + } + /// Start an instance. binary_path is the path to the component binary. pub fn startInstance( self: *Manager, @@ -464,10 +526,17 @@ pub const Manager = struct { errdefer owned.deinit(self.allocator); const now = std_compat.time.milliTimestamp(); - const probe = if (runtime.port > 0) - health.check(self.allocator, "127.0.0.1", runtime.port, runtime.health_endpoint) - else - health.HealthCheckResult{ .ok = true }; + const probe = if (runtime.port > 0) blk: { + const probe_inst = ManagedInstance{ + .component = owned.component, + .name = owned.name, + .port = runtime.port, + .health_endpoint = owned.health_endpoint, + .working_dir = owned.working_dir, + .config_path = owned.config_path, + }; + break :blk self.checkInstanceHealth(&probe_inst); + } else health.HealthCheckResult{ .ok = true }; const status: Status = if (runtime.port == 0 or probe.ok) .running else .starting; const starting_since = if (status == .starting) @@ -634,7 +703,7 @@ pub const Manager = struct { } // Check health endpoint - const result = health.check(self.allocator, "127.0.0.1", inst.port, inst.health_endpoint); + const result = self.checkInstanceHealth(inst); if (result.ok) { inst.status = .running; inst.last_health_ok = now; @@ -699,7 +768,7 @@ pub const Manager = struct { } inst.last_health_check = now; - const result = health.check(self.allocator, "127.0.0.1", inst.port, inst.health_endpoint); + const result = self.checkInstanceHealth(inst); if (result.ok) { if (inst.health_consecutive_failures > 0) { self.logSupervisor(inst.component, inst.name, "health check recovered after {d} consecutive failures", .{inst.health_consecutive_failures}); @@ -892,6 +961,56 @@ test "logSupervisor appends diagnostics to nullhub.log" { try std.testing.expect(std.mem.indexOf(u8, contents, "second diagnostic") != null); } +test "normalizeHealthHost maps wildcard and localhost to loopback" { + const allocator = std.testing.allocator; + + const empty = try Manager.normalizeHealthHost(allocator, ""); + defer allocator.free(empty); + try std.testing.expectEqualStrings("127.0.0.1", empty); + + const wildcard = try Manager.normalizeHealthHost(allocator, "::"); + defer allocator.free(wildcard); + try std.testing.expectEqualStrings("127.0.0.1", wildcard); + + const localhost = try Manager.normalizeHealthHost(allocator, "localhost"); + defer allocator.free(localhost); + try std.testing.expectEqualStrings("127.0.0.1", localhost); + + const ipv6 = try Manager.normalizeHealthHost(allocator, "::1"); + defer allocator.free(ipv6); + try std.testing.expectEqualStrings("::1", ipv6); +} + +test "resolveInstanceHealthHost reads host from working dir config" { + const allocator = std.testing.allocator; + var fixture = try test_helpers.TempPaths.init(allocator); + defer fixture.deinit(); + + var mgr = Manager.init(allocator, fixture.paths); + defer mgr.deinit(); + + const inst_dir = try fixture.path(allocator, "watch"); + defer allocator.free(inst_dir); + try std_compat.fs.makeDirAbsolute(inst_dir); + + const config_path = try std.fs.path.join(allocator, &.{ inst_dir, "config.json" }); + defer allocator.free(config_path); + const file = try std_compat.fs.createFileAbsolute(config_path, .{ .truncate = true }); + defer file.close(); + try file.writeAll("{\"host\":\"::1\"}"); + + const inst = ManagedInstance{ + .component = "nullwatch", + .name = "watch", + .working_dir = inst_dir, + }; + const host = mgr.resolveInstanceHealthHost(&inst); + defer host.deinit(allocator); + + try std.testing.expect(host.owned); + try std.testing.expectEqualStrings("::1", host.value); +} + test "status reporting for manually-added instance" { const allocator = std.testing.allocator; var fixture = try test_helpers.TempPaths.init(allocator); diff --git a/ui/src/lib/api/client.ts b/ui/src/lib/api/client.ts index 2a2ef84..47d9b3e 100644 --- a/ui/src/lib/api/client.ts +++ b/ui/src/lib/api/client.ts @@ -22,6 +22,9 @@ type InstanceStartOptions = { launch_mode?: string; verbose?: boolean; }; +type ObservabilityTarget = { + watch?: string; +}; async function request(path: string, options?: RequestInit): Promise { const res = await fetch(`${BASE}${path}`, { @@ -159,6 +162,61 @@ export const api = { refreshComponents: () => request('/components/refresh', { method: 'POST' }), + getObservabilityHealth: (params?: ObservabilityTarget) => + request(withQuery('/observability/health', { nullhub_watch: params?.watch })), + getObservabilitySummary: (params?: ObservabilityTarget) => + request(withQuery('/observability/v1/summary', { nullhub_watch: params?.watch })), + getObservabilityRuns: (params?: ObservabilityTarget & { run_id?: string; source?: string; operation?: string; status?: string; model?: string; tool_name?: string; verdict?: string; dataset?: string; limit?: number }) => + request( + withQuery('/observability/v1/runs', { + nullhub_watch: params?.watch, + run_id: params?.run_id, + source: params?.source, + operation: params?.operation, + status: params?.status, + model: params?.model, + tool_name: params?.tool_name, + verdict: params?.verdict, + dataset: params?.dataset, + limit: params?.limit, + }), + ), + getObservabilityRun: (runId: string, params?: ObservabilityTarget) => + request( + withQuery(`/observability/v1/runs/${encodeURIComponent(runId)}`, { + nullhub_watch: params?.watch, + }), + ), + getObservabilitySpans: (params?: ObservabilityTarget & { run_id?: string; trace_id?: string; source?: string; operation?: string; status?: string; model?: string; tool_name?: string; task_id?: string; session_id?: string; agent_id?: string; limit?: number }) => + request( + withQuery('/observability/v1/spans', { + nullhub_watch: params?.watch, + run_id: params?.run_id, + trace_id: params?.trace_id, + source: params?.source, + operation: params?.operation, + status: params?.status, + model: params?.model, + tool_name: params?.tool_name, + task_id: params?.task_id, + session_id: params?.session_id, + agent_id: params?.agent_id, + limit: params?.limit, + }), + ), + getObservabilityEvals: (params?: ObservabilityTarget & { run_id?: string; verdict?: string; eval_key?: string; scorer?: string; dataset?: string; limit?: number }) => + request( + withQuery('/observability/v1/evals', { + nullhub_watch: params?.watch, + run_id: params?.run_id, + verdict: params?.verdict, + eval_key: params?.eval_key, + scorer: params?.scorer, + dataset: params?.dataset, + limit: params?.limit, + }), + ), + applyUpdate: (c: string, n: string) => request(`/instances/${c}/${n}/update`, { method: 'POST' }), diff --git a/ui/src/lib/components/InstanceCard.svelte b/ui/src/lib/components/InstanceCard.svelte index da3edd3..dae88ef 100644 --- a/ui/src/lib/components/InstanceCard.svelte +++ b/ui/src/lib/components/InstanceCard.svelte @@ -16,6 +16,7 @@ let displayVersion = $derived( !version ? "-" : version.startsWith("v") || version.startsWith("dev-") ? version : `v${version}`, ); + let portLabel = $derived(component === "nullclaw" ? "Gateway" : "API"); // Sync localStatus when prop changes (from poll) $effect(() => { @@ -64,7 +65,7 @@ {#if localStatus === "running" && port > 0}
- Gateway: + {portLabel}: 127.0.0.1:{port}
{/if} diff --git a/ui/src/lib/components/Sidebar.svelte b/ui/src/lib/components/Sidebar.svelte index 71a72eb..a9beadc 100644 --- a/ui/src/lib/components/Sidebar.svelte +++ b/ui/src/lib/components/Sidebar.svelte @@ -83,6 +83,10 @@ Channels + + + {/if} + {#if usageData?.totals} +
+ Total: {formatTokens(usageData.totals.total_tokens)} tokens in {usageData.totals.requests || 0} request(s) +
+ {/if} + + {/if} {:else if activeTab === "history"} {#key instanceRouteKey} @@ -1111,7 +1347,7 @@

Standalone Launch

- {#if component === "nullclaw" && standaloneBinaryPath} + {#if standaloneBinaryPath}

Run this instance without nullhub, reusing the same config, auth, data, and logs directory. @@ -1154,8 +1390,7 @@

{:else}

- Standalone launch instructions are available for nullclaw - instances for now. + Standalone launch instructions are available after this instance has a versioned binary.

{/if}
@@ -1255,6 +1490,9 @@ gap: 0.75rem; } .btn { + display: inline-flex; + align-items: center; + justify-content: center; padding: 0.5rem 1rem; border: 1px solid var(--accent-dim); border-radius: 2px; @@ -1267,6 +1505,7 @@ cursor: pointer; transition: all 0.2s ease; text-shadow: var(--text-glow); + text-decoration: none; } .btn:hover { background: var(--bg-hover); diff --git a/ui/src/routes/observability/+page.svelte b/ui/src/routes/observability/+page.svelte new file mode 100644 index 0000000..85810f9 --- /dev/null +++ b/ui/src/routes/observability/+page.svelte @@ -0,0 +1,776 @@ + + +
+
+
+

Flight Recorder

+

NullWatch traces, evals, cost, and failure context

+
+
+ {#if watchOptions.length > 1} + + {:else if watchOptions.length === 1} +
+ NullWatch + {watchOptions[0].name} + {watchOptions[0].status} +
+ {/if} + +
+
+ + {#if error} +
ERR: {error}
+ {/if} + +
+
+ Runs + {summary?.run_count ?? 0} +
+
+ Spans + {summary?.span_count ?? 0} +
+
+ Errors + 0}>{summary?.error_count ?? 0} +
+
+ Eval Pass + {summary?.pass_count ?? 0} +
+
+ Eval Fail + 0}>{summary?.fail_count ?? 0} +
+
+ Cost + {formatCost(summary?.total_cost_usd)} +
+
+ + {#if loading && runs.length === 0} +
Loading observability data...
+ {:else} +
+
+
+

Runs

+ {selectedWatch ? `${selectedWatch.name} / ${runs.length}` : runs.length} +
+ {#if runs.length === 0} +
No NullWatch runs found.
+ {:else} +
+ {#each runs as run} + + {/each} +
+ {/if} +
+ +
+ {#if loadingRun} +
Loading run detail...
+ {:else if selectedRun} +
+
+

{selectedSummary?.run_id}

+
+ {formatTime(selectedSummary?.first_seen_ms)} + {formatDuration(selectedSummary?.total_duration_ms)} + {formatCost(selectedSummary?.total_cost_usd)} +
+
+ {selectedSummary?.overall_verdict} +
+ +
+
Spans{selectedSummary?.span_count || 0}
+
Errors{selectedSummary?.error_count || 0}
+
Evals{selectedSummary?.eval_count || 0}
+
Tokens{formatTokens(selectedSummary?.total_input_tokens, selectedSummary?.total_output_tokens)}
+
+ +
Span Timeline
+
+ {#each sortedSpans as span} +
+
+
+
+ {span.operation} + {span.status} +
+
+ {span.source} + {#if span.agent_id}{span.agent_id}{/if} + {#if span.tool_name}{span.tool_name}{/if} + {#if span.model}{span.model}{/if} + {formatDuration(span.duration_ms)} +
+ {#if span.error_message} +
{span.error_message}
+ {/if} + {#if span.attributes_json} +
{span.attributes_json}
+ {/if} +
+
+ {/each} +
+ +
Evals
+ {#if sortedEvals.length === 0} +
No evals attached to this run.
+ {:else} +
+ {#each sortedEvals as evaluation} +
+
+ {evaluation.eval_key} + {evaluation.scorer} · {evaluation.dataset || '-'} +
+
+ {evaluation.score.toFixed(2)} + {evaluation.verdict} +
+ {#if evaluation.notes} +

{evaluation.notes}

+ {/if} +
+ {/each} +
+ {/if} + {:else} +
Select a run.
+ {/if} +
+
+ {/if} +
+ +