diff --git a/src/lib/onboard/machine/events.ts b/src/lib/onboard/machine/events.ts new file mode 100644 index 0000000000..f6b7dca47c --- /dev/null +++ b/src/lib/onboard/machine/events.ts @@ -0,0 +1,174 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +import type { JsonObject, JsonValue } from "../../core/json-types"; +import { redactSensitiveText, redactUrl } from "../../security/redact"; +import type { HermesAuthMethod, Session } from "../../state/onboard-session"; +import type { + OnboardMachineContext, + OnboardMachineEventType, + OnboardMachineState, +} from "./types"; + +export const ONBOARD_SESSION_STEP_TO_MACHINE_STATE = { + preflight: "preflight", + gateway: "gateway", + provider_selection: "provider_selection", + inference: "inference", + sandbox: "sandbox", + agent_setup: "agent_setup", + openclaw: "openclaw", + policies: "policies", +} as const satisfies Readonly>; + +export type OnboardSessionStepName = keyof typeof ONBOARD_SESSION_STEP_TO_MACHINE_STATE; + +export interface OnboardMachineEvent { + version: 1; + type: OnboardMachineEventType; + occurredAt: string; + sessionId: string | null; + state: OnboardMachineState | null; + step: OnboardSessionStepName | null; + context: OnboardMachineContext; + error: string | null; + metadata: JsonObject; +} + +export type OnboardMachineEventListener = (event: OnboardMachineEvent) => void; + +const listeners = new Set(); + +export function addOnboardMachineEventListener( + listener: OnboardMachineEventListener, +): () => void { + listeners.add(listener); + return () => { + listeners.delete(listener); + }; +} + +export function clearOnboardMachineEventListeners(): void { + listeners.clear(); +} + +export function isOnboardSessionStepName(value: string): value is OnboardSessionStepName { + return Object.prototype.hasOwnProperty.call(ONBOARD_SESSION_STEP_TO_MACHINE_STATE, value); +} + +export function machineStateFromOnboardSessionStep( + stepName: string | null | undefined, +): OnboardMachineState | null { + if (!stepName || !isOnboardSessionStepName(stepName)) return null; + return ONBOARD_SESSION_STEP_TO_MACHINE_STATE[stepName]; +} + +function nullableString(value: unknown): string | null { + return typeof value === "string" ? value : null; +} + +function stringArray(value: unknown): string[] | null { + if (!Array.isArray(value)) return null; + return value.filter((entry): entry is string => typeof entry === "string"); +} + +function hermesAuthMethod(value: unknown): HermesAuthMethod | null { + return value === "oauth" || value === "api_key" ? value : null; +} + +function booleanValue(value: unknown): boolean | undefined { + return typeof value === "boolean" ? value : undefined; +} + +function sanitizeJsonValue(value: unknown): JsonValue { + if (typeof value === "string") return redactUrl(value) ?? redactSensitiveText(value) ?? ""; + if (typeof value === "number" && Number.isFinite(value)) return value; + if (typeof value === "boolean" || value === null) return value; + if (Array.isArray(value)) return value.map((entry) => sanitizeJsonValue(entry)); + if (typeof value !== "object") return String(value); + + const result: JsonObject = {}; + for (const [key, entry] of Object.entries(value)) { + result[key] = sanitizeJsonValue(entry); + } + return result; +} + +function endpointOrigin(value: unknown): string | null { + if (typeof value !== "string" || value.trim() === "") return null; + try { + return new URL(value).origin; + } catch { + return null; + } +} + +export function sanitizeOnboardMachineEventMetadata( + metadata: Record | null | undefined, +): JsonObject { + if (!metadata || typeof metadata !== "object" || Array.isArray(metadata)) return {}; + const sanitized: JsonObject = {}; + for (const [key, value] of Object.entries(metadata)) { + sanitized[key] = sanitizeJsonValue(value); + } + return sanitized; +} + +export function buildOnboardMachineContext(session: Session): OnboardMachineContext { + return { + agent: nullableString(session.agent), + sandboxName: nullableString(session.sandboxName), + provider: nullableString(session.provider), + model: nullableString(session.model), + endpointOrigin: endpointOrigin(session.endpointUrl), + credentialEnv: nullableString(session.credentialEnv), + preferredInferenceApi: nullableString(session.preferredInferenceApi), + hermesAuthMethod: hermesAuthMethod(session.hermesAuthMethod), + hermesToolGateways: stringArray(session.hermesToolGateways), + policyPresets: stringArray(session.policyPresets), + messagingChannels: stringArray(session.messagingChannels), + gpuPassthrough: booleanValue(session.gpuPassthrough), + }; +} + +export function createOnboardMachineEvent({ + type, + session, + step, + state, + error = null, + metadata = {}, +}: { + type: OnboardMachineEventType; + session: Session; + step?: string | null; + state?: OnboardMachineState | null; + error?: string | null; + metadata?: Record | null; +}): OnboardMachineEvent { + const normalizedStep = step && isOnboardSessionStepName(step) ? step : null; + return { + version: 1, + type, + occurredAt: new Date().toISOString(), + sessionId: nullableString(session.sessionId), + state: state ?? machineStateFromOnboardSessionStep(normalizedStep), + step: normalizedStep, + context: buildOnboardMachineContext(session), + error: redactSensitiveText(error), + metadata: sanitizeOnboardMachineEventMetadata(metadata), + }; +} + +export function emitOnboardMachineEvent(event: OnboardMachineEvent): void { + if (listeners.size === 0) return; + for (const listener of listeners) { + try { + listener(event); + } catch { + // Event observers are diagnostics only. A broken observer must not + // change onboarding behavior; hook failure events are introduced by the + // later observe-only hook API. + } + } +} diff --git a/src/lib/state/onboard-session.test.ts b/src/lib/state/onboard-session.test.ts index b2c925858f..825ad6937f 100644 --- a/src/lib/state/onboard-session.test.ts +++ b/src/lib/state/onboard-session.test.ts @@ -9,11 +9,15 @@ import { createRequire } from "node:module"; const require = createRequire(import.meta.url); const distPath = require.resolve("../../../dist/lib/state/onboard-session"); +const eventsDistPath = require.resolve("../../../dist/lib/onboard/machine/events"); const originalHome = process.env.HOME; type OnboardSessionModule = typeof import("../../../dist/lib/state/onboard-session"); +type OnboardMachineEventsModule = typeof import("../../../dist/lib/onboard/machine/events"); +type OnboardMachineEvent = import("../../../dist/lib/onboard/machine/events").OnboardMachineEvent; type LoadedSession = NonNullable>; type DebugSummary = NonNullable>; let session: OnboardSessionModule; +let machineEvents: OnboardMachineEventsModule; let tmpDir: string; function requireLoadedSession( @@ -44,13 +48,18 @@ beforeEach(() => { tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "nemoclaw-onboard-session-")); process.env.HOME = tmpDir; delete require.cache[distPath]; + delete require.cache[eventsDistPath]; session = require("../../../dist/lib/state/onboard-session"); + machineEvents = require("../../../dist/lib/onboard/machine/events"); + machineEvents.clearOnboardMachineEventListeners(); session.clearSession(); session.releaseOnboardLock(); }); afterEach(() => { + machineEvents.clearOnboardMachineEventListeners(); delete require.cache[distPath]; + delete require.cache[eventsDistPath]; fs.rmSync(tmpDir, { recursive: true, force: true }); if (originalHome === undefined) { delete process.env.HOME; @@ -117,6 +126,102 @@ describe("onboard session", () => { expect(loaded.failure.message).toMatch(/Sandbox creation failed/); }); + it("emits redacted structured machine events for session step mutations", () => { + const emitted: OnboardMachineEvent[] = []; + machineEvents.addOnboardMachineEventListener((event) => emitted.push(event)); + + session.saveSession(session.createSession({ sessionId: "session-1" })); + session.markStepStarted("gateway"); + session.markStepComplete("gateway", { + sandboxName: "my-assistant", + endpointUrl: + "https://alice:super-secret-token@example.com/v1?token=super-secret-token&keep=yes#token=super-secret-token", + credentialEnv: "NVIDIA_API_KEY", + }); + session.markStepSkipped("openclaw"); + session.markStepFailed("sandbox", "NVIDIA_API_KEY=super-secret-token"); + session.completeSession({ provider: "ollama-local", credentialEnv: null }); + + expect(emitted.map((event) => event.type)).toEqual([ + "state.entered", + "context.updated", + "state.completed", + "state.skipped", + "state.failed", + "onboard.failed", + "context.updated", + "onboard.completed", + ]); + expect(emitted[0]).toMatchObject({ + version: 1, + sessionId: "session-1", + state: "gateway", + step: "gateway", + error: null, + }); + expect(emitted[1].context).toMatchObject({ + sandboxName: "my-assistant", + credentialEnv: "NVIDIA_API_KEY", + }); + expect(emitted[1].context.endpointOrigin).toBe("https://example.com"); + expect(emitted[1].metadata.fields).toEqual([ + "sandboxName", + "endpointUrl", + "credentialEnv", + ]); + expect(emitted[4]).toMatchObject({ + type: "state.failed", + state: "sandbox", + step: "sandbox", + error: "NVIDIA_API_KEY=", + }); + expect(emitted[5]).toMatchObject({ type: "onboard.failed", state: "failed" }); + expect(emitted.at(-1)).toMatchObject({ type: "onboard.completed", state: "complete" }); + expect(JSON.stringify(emitted)).not.toContain("super-secret-token"); + + const persisted = JSON.parse(fs.readFileSync(session.SESSION_FILE, "utf8")); + expect(persisted.events).toBeUndefined(); + }); + + it("keeps event observer failures from changing session mutation behavior", () => { + machineEvents.addOnboardMachineEventListener(() => { + throw new Error("observer failed"); + }); + + session.saveSession(session.createSession()); + expect(() => session.markStepStarted("preflight")).not.toThrow(); + + const loaded = requireLoadedSession(session.loadSession()); + expect(loaded.steps.preflight.status).toBe("in_progress"); + }); + + it("does not emit machine events for unknown session step names", () => { + const emitted: OnboardMachineEvent[] = []; + machineEvents.addOnboardMachineEventListener((event) => emitted.push(event)); + + session.saveSession(session.createSession()); + session.markStepStarted("not_a_real_step"); + + expect(emitted).toEqual([]); + }); + + it("does not emit duplicate events for no-op skipped and completed transitions", () => { + const emitted: OnboardMachineEvent[] = []; + machineEvents.addOnboardMachineEventListener((event) => emitted.push(event)); + + session.saveSession(session.createSession({ sessionId: "session-1" })); + session.markStepSkipped("openclaw"); + session.markStepSkipped("openclaw"); + session.completeSession(); + session.completeSession(); + + expect(emitted.map((event) => event.type)).toEqual([ + "state.skipped", + "onboard.completed", + ]); + expect(emitted).toHaveLength(2); + }); + it("persists safe provider metadata without persisting secrets", () => { session.saveSession(session.createSession()); const unsafeProviderUpdate: Parameters[1] & { diff --git a/src/lib/state/onboard-session.ts b/src/lib/state/onboard-session.ts index f05c1116e8..a74602db39 100644 --- a/src/lib/state/onboard-session.ts +++ b/src/lib/state/onboard-session.ts @@ -18,6 +18,10 @@ import { sanitizeMessagingChannelConfig, type MessagingChannelConfig, } from "../messaging-channel-config"; +import { + createOnboardMachineEvent, + emitOnboardMachineEvent, +} from "../onboard/machine/events"; import { redactSensitiveText, redactUrl } from "../security/redact"; export const SESSION_VERSION = 1; @@ -883,7 +887,8 @@ export function updateSession(mutator: (session: Session) => Session | void): Se } export function markStepStarted(stepName: string): Session { - return updateSession((session) => { + let shouldEmit = false; + const updatedSession = updateSession((session) => { const step = session.steps[stepName]; if (!step) return session; step.status = "in_progress"; @@ -893,12 +898,21 @@ export function markStepStarted(stepName: string): Session { session.lastStepStarted = stepName; session.failure = null; session.status = "in_progress"; + shouldEmit = true; return session; }); + if (shouldEmit) { + emitOnboardMachineEvent( + createOnboardMachineEvent({ type: "state.entered", session: updatedSession, step: stepName }), + ); + } + return updatedSession; } export function markStepComplete(stepName: string, updates: SessionUpdates = {}): Session { - return updateSession((session) => { + const safeUpdates = filterSafeUpdates(updates); + let shouldEmit = false; + const updatedSession = updateSession((session) => { const step = session.steps[stepName]; if (!step) return session; step.status = "complete"; @@ -906,26 +920,52 @@ export function markStepComplete(stepName: string, updates: SessionUpdates = {}) step.error = null; session.lastCompletedStep = stepName; session.failure = null; - Object.assign(session, filterSafeUpdates(updates)); + Object.assign(session, safeUpdates); + shouldEmit = true; return session; }); + if (shouldEmit) { + if (Object.keys(safeUpdates).length > 0) { + emitOnboardMachineEvent( + createOnboardMachineEvent({ + type: "context.updated", + session: updatedSession, + step: stepName, + metadata: { fields: Object.keys(safeUpdates) }, + }), + ); + } + emitOnboardMachineEvent( + createOnboardMachineEvent({ type: "state.completed", session: updatedSession, step: stepName }), + ); + } + return updatedSession; } export function markStepSkipped(stepName: string): Session { - return updateSession((session) => { + let shouldEmit = false; + const updatedSession = updateSession((session) => { const step = session.steps[stepName]; if (!step) return session; - if (step.status === "complete" || step.status === "failed") return session; + if (step.status === "complete" || step.status === "failed" || step.status === "skipped") return session; step.status = "skipped"; step.startedAt = null; step.completedAt = null; step.error = null; + shouldEmit = true; return session; }); + if (shouldEmit) { + emitOnboardMachineEvent( + createOnboardMachineEvent({ type: "state.skipped", session: updatedSession, step: stepName }), + ); + } + return updatedSession; } export function markStepFailed(stepName: string, message: string | null = null): Session { - return updateSession((session) => { + let shouldEmit = false; + const updatedSession = updateSession((session) => { const step = session.steps[stepName]; if (!step) return session; step.status = "failed"; @@ -937,18 +977,62 @@ export function markStepFailed(stepName: string, message: string | null = null): recordedAt: new Date().toISOString(), }); session.status = "failed"; + shouldEmit = true; return session; }); + if (shouldEmit) { + emitOnboardMachineEvent( + createOnboardMachineEvent({ + type: "state.failed", + session: updatedSession, + step: stepName, + error: message, + }), + ); + emitOnboardMachineEvent( + createOnboardMachineEvent({ + type: "onboard.failed", + session: updatedSession, + state: "failed", + step: stepName, + error: message, + }), + ); + } + return updatedSession; } export function completeSession(updates: SessionUpdates = {}): Session { - return updateSession((session) => { - Object.assign(session, filterSafeUpdates(updates)); + const safeUpdates = filterSafeUpdates(updates); + let wasComplete = false; + const updatedSession = updateSession((session) => { + wasComplete = session.status === "complete"; + Object.assign(session, safeUpdates); session.status = "complete"; session.resumable = false; session.failure = null; return session; }); + if (Object.keys(safeUpdates).length > 0) { + emitOnboardMachineEvent( + createOnboardMachineEvent({ + type: "context.updated", + session: updatedSession, + state: "complete", + metadata: { fields: Object.keys(safeUpdates) }, + }), + ); + } + if (!wasComplete) { + emitOnboardMachineEvent( + createOnboardMachineEvent({ + type: "onboard.completed", + session: updatedSession, + state: "complete", + }), + ); + } + return updatedSession; } export function summarizeForDebug(