diff --git a/src/clippy-baseline.txt b/src/clippy-baseline.txt index 29e49a011..de8febe1c 100644 --- a/src/clippy-baseline.txt +++ b/src/clippy-baseline.txt @@ -1 +1 @@ -157 +168 diff --git a/src/eslint-baseline.linux.txt b/src/eslint-baseline.linux.txt index 38627a6f0..7e30bed39 100644 --- a/src/eslint-baseline.linux.txt +++ b/src/eslint-baseline.linux.txt @@ -1 +1 @@ -5432 +5431 diff --git a/src/shared/generated/events/EventClassChannelStrategy.ts b/src/shared/generated/events/EventClassChannelStrategy.ts new file mode 100644 index 000000000..44446a0e9 --- /dev/null +++ b/src/shared/generated/events/EventClassChannelStrategy.ts @@ -0,0 +1,18 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Channel-strategy for an event class — how the event-name maps to an airc + * channel when `broadcast: true`. The transport consults this at emit time. + * + * - `Local` — no broadcast (paired with `broadcast: false`). + * - `Global` — mesh-wide single channel (e.g. `#presence`). + * - `ByRoomId` — event payload must carry `roomId`; routed to that + * room's airc channel. + * - `ByPeerId` — event payload must carry `peerId`; routed to a + * peer-targeted channel (DM-like). + * - `Custom` — caller-supplied channel resolver runs at emit time. + * (The resolver itself can't cross the wire — it's a per-process + * function ref — so on the TS side the resolver is registered + * separately from the Rust-canonical config.) + */ +export type EventClassChannelStrategy = "local" | "global" | "byRoomId" | "byPeerId" | "custom"; diff --git a/src/shared/generated/events/EventClassConfig.ts b/src/shared/generated/events/EventClassConfig.ts new file mode 100644 index 000000000..da1dd1c5e --- /dev/null +++ b/src/shared/generated/events/EventClassConfig.ts @@ -0,0 +1,40 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { EventClassChannelStrategy } from "./EventClassChannelStrategy"; +import type { EventClassUnknownSchemaPolicy } from "./EventClassUnknownSchemaPolicy"; + +/** + * Caller-supplied event-class declaration. All optional fields fill with + * conservative defaults (no broadcast, no airc cost). + */ +export type EventClassConfig = { +/** + * Distribute this event class through the airc transport in addition + * to the local + WebSocket transports? + * + * `false` (default) — local + WebSocket only. Zero airc cost. + * `true` — also durable on the airc log; reaches cross-machine + * subscribers via the AircEventTransport (L1-2). + */ +broadcast: boolean, +/** + * How the event-name + payload map to an airc channel when broadcast + * is `true`. Defaults to `Local` when `broadcast: false`, otherwise + * required (validation throws on missing-when-broadcast). + */ +channel?: EventClassChannelStrategy, +/** + * Wire-format schema version. Subscribers fail loud on unknown + * versions per `on_unknown_schema`. Bump when the payload shape + * changes incompatibly. + */ +schemaVersion: string, +/** + * Action when a subscriber receives an event whose declared + * `schemaVersion` doesn't match its build. Default `Fail`. + */ +onUnknownSchema?: EventClassUnknownSchemaPolicy, +/** + * Optional human-readable description for `grid/show-event-classes` + * and similar introspection. Not load-bearing at runtime. + */ +description?: string, }; diff --git a/src/shared/generated/events/EventClassUnknownSchemaPolicy.ts b/src/shared/generated/events/EventClassUnknownSchemaPolicy.ts new file mode 100644 index 000000000..80f6d3e81 --- /dev/null +++ b/src/shared/generated/events/EventClassUnknownSchemaPolicy.ts @@ -0,0 +1,8 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Behavior when a subscriber receives an event with a `schemaVersion` + * it doesn't recognize. Default `Fail` matches the standing project rule + * of never silently swallowing evidence. + */ +export type EventClassUnknownSchemaPolicy = "warn" | "fail"; diff --git a/src/shared/generated/events/ResolvedEventClassConfig.ts b/src/shared/generated/events/ResolvedEventClassConfig.ts new file mode 100644 index 000000000..d817f6b27 --- /dev/null +++ b/src/shared/generated/events/ResolvedEventClassConfig.ts @@ -0,0 +1,9 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { EventClassChannelStrategy } from "./EventClassChannelStrategy"; +import type { EventClassUnknownSchemaPolicy } from "./EventClassUnknownSchemaPolicy"; + +/** + * Canonical, post-validation form of an event-class declaration. + * What the registry stores + what the TS side caches. + */ +export type ResolvedEventClassConfig = { name: string, broadcast: boolean, channel: EventClassChannelStrategy, schemaVersion: string, onUnknownSchema: EventClassUnknownSchemaPolicy, description: string, }; diff --git a/src/shared/generated/events/index.ts b/src/shared/generated/events/index.ts new file mode 100644 index 000000000..b0ad20dc4 --- /dev/null +++ b/src/shared/generated/events/index.ts @@ -0,0 +1,8 @@ +// Auto-generated barrel export — do not edit manually +// Source: generator/generate-rust-bindings.ts +// Re-generate: npx tsx generator/generate-rust-bindings.ts + +export type { EventClassChannelStrategy } from './EventClassChannelStrategy'; +export type { EventClassConfig } from './EventClassConfig'; +export type { EventClassUnknownSchemaPolicy } from './EventClassUnknownSchemaPolicy'; +export type { ResolvedEventClassConfig } from './ResolvedEventClassConfig'; diff --git a/src/shared/generated/governor/index.ts b/src/shared/generated/governor/index.ts index 991d321f1..e72cad0fa 100644 --- a/src/shared/generated/governor/index.ts +++ b/src/shared/generated/governor/index.ts @@ -1,6 +1,6 @@ // Auto-generated barrel export — do not edit manually -// Source: workers/continuum-core/src/governor/types.rs (ts-rs) -// Re-generate: cargo test --lib --features metal,accelerate governor:: +// Source: generator/generate-rust-bindings.ts +// Re-generate: npx tsx generator/generate-rust-bindings.ts export type { CadenceMultipliers } from './CadenceMultipliers'; export type { CascadeAction } from './CascadeAction'; diff --git a/src/shared/generated/index.ts b/src/shared/generated/index.ts index c2c70de5d..27e190319 100644 --- a/src/shared/generated/index.ts +++ b/src/shared/generated/index.ts @@ -34,14 +34,197 @@ export type { UsageMetrics } from './ai'; export type { VideoInput } from './ai'; export * from './airc'; export * from './code'; -export * from './cognition'; +// cognition: explicit exports (has duplicate types) +export type { AIDecisionContext } from './cognition'; +export type { AIGatingDecision } from './cognition'; +export type { AIGatingDecisionFactors } from './cognition'; +export type { AdaptiveThroughputPlan } from './cognition'; +export type { AdaptiveThroughputRequest } from './cognition'; +export type { AdversarialPatternDecline } from './cognition'; +export type { AnalysisError } from './cognition'; +export type { AuditEntry } from './cognition'; +export type { AuditEntryKind } from './cognition'; +export type { EmbedToolsRequest } from './cognition'; +export type { EmbedToolsResponse } from './cognition'; +export type { GatingConversationMessage } from './cognition'; +export type { GatingMessageContent } from './cognition'; +export type { GatingRagContext } from './cognition'; +export type { GatingRagMetadata } from './cognition'; +export type { GatingRecipeStrategy } from './cognition'; +export type { GatingTriggerMessage } from './cognition'; +export type { GenerateResponseAdmissionPolicy } from './cognition'; +export type { GenerateResponseRequest } from './cognition'; +export type { GenerateResponseResult } from './cognition'; +export type { HostCapability } from './cognition'; +export type { ProbeError } from './cognition'; +export type { HwCapabilityTier } from './cognition'; +export type { LeverCall } from './cognition'; +export type { LeverName } from './cognition'; +export type { LocalOrCloudPolicy } from './cognition'; +export type { MediaItemLite } from './cognition'; +export type { ModelRequirement } from './cognition'; +export type { NativeBatchOutcome } from './cognition'; +export type { ParsedToolBatch } from './cognition'; +export type { PersonaMediaConfigLite } from './cognition'; +export type { PersonaRenderRequest } from './cognition'; +export type { PersonaResponse } from './cognition'; +export type { PersonaTurnPlan } from './cognition'; +export type { PriorContribution } from './cognition'; +export type { ProposalRating } from './cognition'; +export type { RateProposalsRequest } from './cognition'; +export type { RateProposalsResponse } from './cognition'; +export type { RatingContext } from './cognition'; +export type { RatingMessage } from './cognition'; +export type { RecentMessage } from './cognition'; +export type { RecipeDefinitionShape } from './cognition'; +export type { RecipeGenerateHints } from './cognition'; +export type { RecipeGenerationRequest } from './cognition'; +export type { RecipeGenerationResponse } from './cognition'; +export type { RecipePersonaCandidate } from './cognition'; +export type { RecipeRagSourcePolicy } from './cognition'; +export type { RecipeTemplateInfo } from './cognition'; +export type { RecipeTurnBatchPlan } from './cognition'; +export type { RecipeTurnBatchRequest } from './cognition'; +export type { RecipeTurnTrigger } from './cognition'; +export type { RedundancyCheckRequest } from './cognition'; +export type { RedundancyDecision } from './cognition'; +export type { ResolutionError } from './cognition'; +export type { ResolvedModel } from './cognition'; +export type { ResourceAdmissionPolicy } from './cognition'; +export type { ResourceClass } from './cognition'; +export type { ResponderDecision } from './cognition'; +export type { ResponseDecision } from './cognition'; +export type { ResponseProposal } from './cognition'; +export type { SemanticSearchResult } from './cognition'; +export type { SemanticSearchToolsRequest } from './cognition'; +export type { SharedAnalysis } from './cognition'; +export type { SharedAnalysisIntent } from './cognition'; +export type { SharedRagSourcePlan } from './cognition'; +export type { ShouldRespondRequest } from './cognition'; +export type { SiliconResidencyRequirement } from './cognition'; +export type { TargetSilicon } from './cognition'; +export type { ThreatDetectionReport } from './cognition'; +export type { ThreatEvidence } from './cognition'; +export type { ThreatFrame } from './cognition'; +export type { ThreatFrameKind } from './cognition'; +export type { ThreatPatternKind } from './cognition'; +export type { ThreatRefusalAuditPayload } from './cognition'; +export type { ThreatSeverity } from './cognition'; +export type { ThreatSignal } from './cognition'; +export type { ThroughputJob } from './cognition'; +export type { ThroughputLaneBudget } from './cognition'; +export type { ThroughputLease } from './cognition'; +export type { ThroughputLeaseRevocationPolicy } from './cognition'; +export type { ThroughputLeaseSnapshot } from './cognition'; +export type { TokenUsage } from './cognition'; +export type { ToolDescription } from './cognition'; +export type { ToolEmbedding } from './cognition'; +export type { ToolError } from './cognition'; +export type { ToolExecutionContext } from './cognition'; +export type { ToolInvocation } from './cognition'; +export type { ToolOutcome } from './cognition'; +export type { ValidateResponseDecision } from './cognition'; +export type { ValidateResponseRequest } from './cognition'; +export type { VisionDescribeOptions } from './cognition'; +export type { VisionDescribeRequest } from './cognition'; +export type { VisionDescription } from './cognition'; export * from './comms'; export * from './dataset'; -export * from './forge'; -export * from './genome'; +export * from './events'; +// forge: explicit exports (has duplicate types) +export type { AlloyHardware } from './forge'; +export type { AlloySource } from './forge'; +export type { BenchmarkDef } from './forge'; +export type { CorpusRef } from './forge'; +export type { ForgeArtifact } from './forge'; +export type { ForgeRecipe } from './forge'; +export type { HardwareProfile } from './forge'; +export type { PriorBaseline } from './forge'; +export type { QuantTier } from './forge'; +// genome: explicit exports (has duplicate types) +export type { AccessDenied } from './genome'; +export type { AcquireSource } from './genome'; +export type { ArtifactId } from './genome'; +export type { ArtifactRef } from './genome'; +export type { CandidateArtifact } from './genome'; +export type { CapabilityQuery } from './genome'; +export type { CompositionHint } from './genome'; +export type { CompositionRef } from './genome'; +export type { DomainHint } from './genome'; +export type { EngramRef } from './genome'; +export type { EvictionPolicy } from './genome'; +export type { EvictionRecord } from './genome'; +export type { FreshnessTarget } from './genome'; +export type { LoRALayerRef } from './genome'; +export type { MoEExpertRef } from './genome'; +export type { OutcomeWindow } from './genome'; +export type { PageFault } from './genome'; +export type { PageHandle } from './genome'; +export type { PageKind } from './genome'; +export type { PageOffset } from './genome'; +export type { PageRef } from './genome'; +export type { PeerId } from './genome'; +export type { PersonaId } from './genome'; +export type { Provenance } from './genome'; +export type { RankedPool } from './genome'; +export type { RecallBudget } from './genome'; +export type { RecallContext } from './genome'; +export type { RecallError } from './genome'; +export type { RecallScope } from './genome'; +export type { RecallScore } from './genome'; +export type { RecallScoreWeights } from './genome'; +export type { RecallTrace } from './genome'; +export type { ResidencyHint } from './genome'; +export type { ResidentPage } from './genome'; +export type { TaskKind } from './genome'; +export type { TierCapacity } from './genome'; +export type { TierError } from './genome'; +export type { TierRole } from './genome'; +export type { TrajectoryHint } from './genome'; +export type { TrustClass } from './genome'; +export type { WorkingSet } from './genome'; +export type { WorkingSetCapacity } from './genome'; +// governor: explicit exports (has duplicate types) +export type { CadenceMultipliers } from './governor'; +export type { CascadeAction } from './governor'; +export type { CascadeThresholds } from './governor'; +export type { ConcurrencyCaps } from './governor'; +export type { ConsolidationSchedule } from './governor'; +export type { FederationCadence } from './governor'; +export type { GovernorPolicy } from './governor'; +export type { GovernorSnapshot } from './governor'; +export type { HardwareClass } from './governor'; +export type { PowerSource } from './governor'; +export type { PressureSignal } from './governor'; +export type { SpeculationLevel } from './governor'; +export type { ThermalClass } from './governor'; +export type { ThermalSeverity } from './governor'; +export type { TierSizes } from './governor'; export * from './gpu'; -export * from './grid'; +// grid: explicit exports (has duplicate types) +export type { GridNode } from './grid'; +export type { NodeCapability } from './grid'; +export type { TransportAddress } from './grid'; +export type { TrustLevel } from './grid'; export * from './inference'; +// inference_capability: explicit exports (has duplicate types) +export type { BackendChoice } from './inference_capability'; +export type { BlockReason } from './inference_capability'; +export type { InferenceCapability } from './inference_capability'; +export type { InferenceKind } from './inference_capability'; +export type { LatencyClass } from './inference_capability'; +export type { QwenModelMetadata } from './inference_capability'; +export type { ResidencyEvidence } from './inference_capability'; +export type { ResidencyGateResult } from './inference_capability'; +// inference_llm: explicit exports (has duplicate types) +export type { CompositionPlan } from './inference_llm'; +export type { FirstTokenEmitted } from './inference_llm'; +export type { GenerationBudget } from './inference_llm'; +export type { InferenceComplete } from './inference_llm'; +export type { InferenceRequest } from './inference_llm'; +export type { InferenceRequestId } from './inference_llm'; +export type { ResidencyFault } from './inference_llm'; +export type { SamplingParams } from './inference_llm'; export * from './ipc'; export * from './live'; export * from './logger'; diff --git a/src/shared/generated/inference_capability/index.ts b/src/shared/generated/inference_capability/index.ts index 8641dff52..a7db9243f 100644 --- a/src/shared/generated/inference_capability/index.ts +++ b/src/shared/generated/inference_capability/index.ts @@ -1,6 +1,6 @@ // Auto-generated barrel export — do not edit manually -// Source: workers/continuum-core/src/inference_capability/types.rs (ts-rs) -// Re-generate: cargo test --lib --features metal,accelerate inference_capability:: +// Source: generator/generate-rust-bindings.ts +// Re-generate: npx tsx generator/generate-rust-bindings.ts export type { BackendChoice } from './BackendChoice'; export type { BlockReason } from './BlockReason'; diff --git a/src/system/core/shared/Events.ts b/src/system/core/shared/Events.ts index 44d443bca..fb26a3e6e 100644 --- a/src/system/core/shared/Events.ts +++ b/src/system/core/shared/Events.ts @@ -21,6 +21,10 @@ import { RouterRegistry } from './RouterRegistry'; import { BaseEntity } from '../../data/entities/BaseEntity'; import { ElegantSubscriptionParser, type SubscriptionFilter } from '../../events/shared/ElegantSubscriptionParser'; import { jtagWindow, jtagGlobal } from '../types/GlobalAugmentations'; +// L1-1: event-class registry — hot-path sync peek for transport hints. +// Async warm-up is delegated so the first emit on an undeclared class +// doesn't block the emit; the next emit benefits from the warm cache. +import { peekEventClassCache, getEventClass } from '../../events/shared/EventClass'; // Verbose logging helper (works in both browser and server) const verbose = () => { @@ -168,6 +172,26 @@ export class Events { } } + // L1-1: consult the event-class registry. Sync peek only — the hot + // emit path can't afford an IPC round-trip per call. If the class + // is declared and cached, attach the hints to the payload so + // downstream transports (L1-2 AircEventTransport) can route it. + // If the cache is cold, kick off a fire-and-forget warm-up; the + // NEXT emit benefits. If the class is undeclared, no hints attached + // and behavior is identical to pre-L1-1 (local + WebSocket only). + const cachedClass = peekEventClassCache(eventName); + if (cachedClass === undefined) { + // Fire-and-forget warm-up. We deliberately do NOT await — the + // current emit goes through with no hints; subsequent emits hit + // the warm cache. Errors are surfaced (NOT swallowed) so a broken + // IPC manifests as a visible warning rather than mysteriously-missing + // routing hints. + getEventClass(eventName).catch((err: unknown) => { + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[Events] EventClass lookup failed for '${eventName}': ${msg}`); + }); + } + // Router found - use full EventBridge routing // Create event payload const eventPayload: EventBridgePayload = { @@ -183,7 +207,19 @@ export class Events { data: eventData as Record, originSessionId: options.sessionId ?? context.uuid, originContextUUID: context.uuid, - timestamp: new Date().toISOString() + timestamp: new Date().toISOString(), + ...(cachedClass + ? { + eventClass: { + name: cachedClass.name, + broadcast: cachedClass.broadcast, + channel: cachedClass.channel, + schemaVersion: cachedClass.schemaVersion, + onUnknownSchema: cachedClass.onUnknownSchema, + description: cachedClass.description, + }, + } + : {}), }; // Create event message diff --git a/src/system/events/index.ts b/src/system/events/index.ts index b0e2135ab..e226b4bef 100644 --- a/src/system/events/index.ts +++ b/src/system/events/index.ts @@ -3,4 +3,19 @@ */ export { SYSTEM_EVENTS, type SystemEventData, type SystemEventName } from './shared/SystemEvents'; -export { EventManager, type EventsInterface } from './shared/JTAGEventSystem'; \ No newline at end of file +export { EventManager, type EventsInterface } from './shared/JTAGEventSystem'; + +// L1-1: Event-class declaration registry (Rust-truth, TS-cached). +// See docs/grid/GRID-MIGRATION-ROADMAP.md, GRID-BUS-ARCHITECTURE §2.2. +export { + declareEventClass, + getEventClass, + peekEventClassCache, + listEventClasses, + resolveEventChannel, + _resetEventClassCacheForTests, + type EventClassConfig, + type EventClassChannelStrategy, + type EventClassUnknownSchemaPolicy, + type ResolvedEventClassConfig, +} from './shared/EventClass'; \ No newline at end of file diff --git a/src/system/events/shared/EventClass.ts b/src/system/events/shared/EventClass.ts new file mode 100644 index 000000000..310a5710a --- /dev/null +++ b/src/system/events/shared/EventClass.ts @@ -0,0 +1,231 @@ +/** + * EventClass — thin TS shim over the Rust event-class registry. + * + * Roadmap item L1-1 (see docs/grid/GRID-MIGRATION-ROADMAP.md). + * Spec: GRID-BUS-ARCHITECTURE §2.2 (continuum#1439). + * + * Native-truth-thin-SDK-per-language: declarations are stored canonically + * in Rust (`crate::events::event_class_registry`). This module is the + * thin TS wrapper: + * + * 1. Re-exports the generated wire types (single source of truth). + * 2. Provides `declareEventClass(name, config)` — typed wrapper that + * calls the Rust `events/declare-class` IPC via `RustCoreIPCClient`. + * 3. Provides `getEventClass(name)` — read-through cache for the hot + * `Events.emit()` path. First lookup hits the registry once via IPC, + * result is cached for the lifetime of the process. Declarations + * are immutable once made (conflicting re-declare throws on the + * Rust side), so cache-invalidation isn't needed. + * 4. Provides `resolveEventChannel(name, payload)` — the airc transport + * consults this at emit time. Channel resolution is payload-dependent + * (ByRoomId / ByPeerId), so this can't be precomputed — but the + * class config it reads from IS cached. + * + * Why local cache: `Events.emit()` is in the hot path. A round-trip to + * Rust on every emit would add ~1ms per event. With a local read-through + * cache, only the first lookup pays IPC; everything after is a Map.get. + * + * What the cache does NOT do: it does not mutate. All declarations go + * through the IPC. Two processes that both call `declareEventClass` + * with conflicting configs will get one success + one error from the + * Rust registry — the cache cannot mask this. + * + * Mutability semantics: declarations are append-only. Once a class is + * declared in Rust, identical re-declarations succeed (idempotent); + * conflicting re-declarations throw. The cache therefore never has to + * invalidate — what it has is final. + * + * Why this bypasses `Commands.execute()`: the registry is a foundational + * primitive — declared event classes are what `Events.emit()` consults + * to know whether/where to broadcast. Going through Commands.execute() + * here would create a layering inversion (the bus would consult event + * metadata that requires the bus to fetch). Direct IPC keeps the + * dependency one-way. The CLI/introspection surface (`grid/show-event-classes`) + * can be added as a separate TS Command when needed (L4 roadmap item). + */ + +// Use a dynamic import to dodge the shared/server divide — this module +// lives in `shared/` but the RustCoreIPCClient is server-only. Browser +// callers shouldn't be declaring event classes (they consume the bus, +// they don't shape it), but they may import the *types* from here. +import type { + EventClassConfig, + EventClassChannelStrategy, + EventClassUnknownSchemaPolicy, + ResolvedEventClassConfig, +} from '@shared/generated/events'; + +// Re-export the generated wire types so callers can import them from +// `@system/events/shared/EventClass` (a stable path) without reaching +// into `@shared/generated/events` directly. +export type { + EventClassConfig, + EventClassChannelStrategy, + EventClassUnknownSchemaPolicy, + ResolvedEventClassConfig, +}; + +// ─── IPC client access (server-only, lazy-loaded) ─────────────────────── + +interface RustIPCClient { + eventsDeclareClass(params: EventClassConfig & { name: string }): Promise; + eventsGetClass(name: string): Promise; + eventsListClasses(): Promise; + eventsResolveChannel(name: string, payload: Record): Promise; +} + +let cachedClientPromise: Promise | null = null; + +async function getRustClient(): Promise { + if (cachedClientPromise) return cachedClientPromise; + cachedClientPromise = (async (): Promise => { + // Dynamic import so this module stays loadable in browser bundles + // (where the import would fail). Browser consumers should only + // import types from here, never call the imperative functions. + const mod = await import('../../../workers/continuum-core/bindings/RustCoreIPC'); + const client = await mod.RustCoreIPCClient.getInstanceAsync(); + return client as unknown as RustIPCClient; + })(); + return cachedClientPromise; +} + +// ─── Read-through cache ───────────────────────────────────────────────── + +/** + * Process-local cache of resolved event-class configs. Keyed by class name. + * + * Three states represented: + * - Missing key — never looked up. + * - `null` value — looked up; Rust said "not declared". + * - `ResolvedEventClassConfig` — looked up; declared. + * + * The `null` case is cached separately so a hot-path emit on an undeclared + * class doesn't keep paying IPC. + */ +const classCache = new Map(); + +/** + * In-flight dedup — if two callers ask for the same class concurrently + * before the first IPC returns, they share one round-trip. + */ +const inFlight = new Map>(); + +/** + * Test-only: clear the local cache. Production code does not need this — + * declarations are append-only and the cache never goes stale. Used by + * unit tests that exercise the IPC path repeatedly with different state. + */ +export function _resetEventClassCacheForTests(): void { + classCache.clear(); + inFlight.clear(); + cachedClientPromise = null; +} + +// ─── Public API ───────────────────────────────────────────────────────── + +/** + * Register an event class. Idempotent for identical re-declarations; + * throws on conflicting re-declarations (wire-contract integrity). + * + * Most callers declare their classes once at module-load time: + * + * await declareEventClass('presence:peer-manifest', { + * broadcast: true, + * channel: 'global', + * schemaVersion: 'v1', + * description: 'Peer-manifest advertisements (BGP-style route ads)', + * }); + */ +export async function declareEventClass( + name: string, + config: EventClassConfig, +): Promise { + const client = await getRustClient(); + const resolved = await client.eventsDeclareClass({ name, ...config }); + // Prime the cache with the canonical form so the very next emit + // doesn't have to round-trip back. + classCache.set(name, resolved); + return resolved; +} + +/** + * Look up a class's resolved config, with local read-through caching. + * + * Returns `null` when the class is undeclared — callers fall back to + * default backward-compat behavior (local + WebSocket only, no airc). + * The `null` result is itself cached so undeclared classes don't keep + * paying IPC on the hot path. + */ +export async function getEventClass(name: string): Promise { + if (classCache.has(name)) { + return classCache.get(name) ?? null; + } + const pending = inFlight.get(name); + if (pending) return pending; + + const lookup = (async (): Promise => { + try { + const client = await getRustClient(); + const result = await client.eventsGetClass(name); + classCache.set(name, result ?? null); + return result ?? null; + } finally { + inFlight.delete(name); + } + })(); + inFlight.set(name, lookup); + return lookup; +} + +/** + * Synchronous cache peek. Returns: + * - `ResolvedEventClassConfig` if cached + declared + * - `null` if cached + undeclared + * - `undefined` if not yet looked up + * + * Useful for the hot emit-path: if the class is already cached, emit can + * make a sync decision; if not, emit either falls back to default + * behavior or kicks off an async lookup. Whichever is right for the + * caller's latency budget. + */ +export function peekEventClassCache(name: string): ResolvedEventClassConfig | null | undefined { + return classCache.get(name); +} + +/** + * Snapshot of all declared classes — fresh from the registry, NOT from + * the local cache. Used by introspection commands (`grid/show-event-classes`) + * and by startup paths that prime the cache. + * + * Side effect: populates the cache with every class returned, so + * subsequent `peekEventClassCache` / `getEventClass` calls hit local + * memory. + */ +export async function listEventClasses(): Promise { + const client = await getRustClient(); + const list = await client.eventsListClasses(); + for (const cls of list) { + classCache.set(cls.name, cls); + } + return list; +} + +/** + * Resolve the airc channel an emit of `name` should land on. + * + * Throws if: + * - The class isn't declared. + * - The class is `broadcast: false` (no channel to resolve). + * - The class's channel strategy is payload-dependent and the payload + * doesn't carry the required field (e.g. ByRoomId without `roomId`). + * + * The L1-2 AircEventTransport consults this at emit time to decide + * which gist / channel to write the event to. + */ +export async function resolveEventChannel( + name: string, + payload: Record, +): Promise { + const client = await getRustClient(); + return client.eventsResolveChannel(name, payload); +} diff --git a/src/system/events/shared/EventSystemTypes.ts b/src/system/events/shared/EventSystemTypes.ts index 82f318d86..d5f42be46 100644 --- a/src/system/events/shared/EventSystemTypes.ts +++ b/src/system/events/shared/EventSystemTypes.ts @@ -49,6 +49,24 @@ export interface EventBridgePayload extends JTAGPayload { originSessionId: UUID; originContextUUID: UUID; // Required - no optional context timestamp: string; + /** + * Optional event-class hints from the L1-1 registry. Present when the + * eventName has been declared via `declareEventClass()` and the local + * cache was warm at emit time. Downstream transports (L1-2 AircEventTransport) + * read this to decide which channel/transport the event should land on. + * When absent, transports fall back to default behavior (local + WebSocket). + * Shape mirrors `ResolvedEventClassConfig` from `@shared/generated/events` + * but typed here loosely to keep this types-only module free of the + * generated-types dependency cycle. + */ + eventClass?: { + name: string; + broadcast: boolean; + channel: string; + schemaVersion: string; + onUnknownSchema: string; + description: string; + }; } /** diff --git a/src/tests/unit/core/event-class-registry.test.ts b/src/tests/unit/core/event-class-registry.test.ts new file mode 100644 index 000000000..2131830f1 --- /dev/null +++ b/src/tests/unit/core/event-class-registry.test.ts @@ -0,0 +1,213 @@ +/** + * EventClass — TS thin-SDK unit tests. + * + * Validates the cache behavior + the wire-shape integration with the Rust + * registry via a mock IPC client (so this test doesn't require the Rust + * binary to be running). + * + * Roadmap item L1-1 (see docs/grid/GRID-MIGRATION-ROADMAP.md). + * + * Suites are split into multiple top-level `describe` blocks (one per + * public function) to stay under the max-lines-per-function lint limit. + * Common per-test mock reset lives in `resetMocks` below. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import type { ResolvedEventClassConfig } from '@shared/generated/events'; + +// Mock the RustCoreIPC module BEFORE importing EventClass. +// EventClass dynamic-imports the IPC client, so the mock has to be in +// place by the time the dynamic import resolves. +const mockEventsDeclareClass = vi.fn(); +const mockEventsGetClass = vi.fn(); +const mockEventsListClasses = vi.fn(); +const mockEventsResolveChannel = vi.fn(); + +vi.mock('../../../workers/continuum-core/bindings/RustCoreIPC', () => { + const mockClient = { + eventsDeclareClass: mockEventsDeclareClass, + eventsGetClass: mockEventsGetClass, + eventsListClasses: mockEventsListClasses, + eventsResolveChannel: mockEventsResolveChannel, + }; + return { + RustCoreIPCClient: { + getInstanceAsync: vi.fn(() => Promise.resolve(mockClient)), + }, + }; +}); + +import { + declareEventClass, + getEventClass, + peekEventClassCache, + listEventClasses, + resolveEventChannel, + _resetEventClassCacheForTests, +} from '@system/events/shared/EventClass'; + +function makeResolved(name: string, broadcast = false, channel: 'local' | 'global' = 'local'): ResolvedEventClassConfig { + return { + name, + broadcast, + channel, + schemaVersion: 'v1', + onUnknownSchema: 'fail', + description: '', + }; +} + +// Per-suite reset — extracted so each top-level describe stays under the +// max-lines-per-function lint limit while keeping a clean fixture. +function resetMocks(): void { + _resetEventClassCacheForTests(); + mockEventsDeclareClass.mockReset(); + mockEventsGetClass.mockReset(); + mockEventsListClasses.mockReset(); + mockEventsResolveChannel.mockReset(); +} + +describe('EventClass — declareEventClass', () => { + beforeEach(resetMocks); + + it('forwards to Rust IPC + primes the cache', async () => { + const resolved = makeResolved('test:local-class'); + mockEventsDeclareClass.mockResolvedValueOnce(resolved); + + const result = await declareEventClass('test:local-class', { + broadcast: false, + schemaVersion: 'v1', + }); + + expect(result).toEqual(resolved); + expect(mockEventsDeclareClass).toHaveBeenCalledWith({ + name: 'test:local-class', + broadcast: false, + schemaVersion: 'v1', + }); + // Cache primed — peek hits without another IPC call. + expect(peekEventClassCache('test:local-class')).toEqual(resolved); + }); + + it('propagates wire-contract errors (conflicting redeclare)', async () => { + mockEventsDeclareClass.mockRejectedValueOnce(new Error('conflicting redeclaration')); + await expect( + declareEventClass('test:conflict', { broadcast: false, schemaVersion: 'v1' }), + ).rejects.toThrow(/conflicting redeclaration/); + }); +}); + +describe('EventClass — getEventClass (read-through cache)', () => { + beforeEach(resetMocks); + + it('caches a successful lookup so the second call skips IPC', async () => { + const resolved = makeResolved('test:cached'); + mockEventsGetClass.mockResolvedValueOnce(resolved); + + const first = await getEventClass('test:cached'); + const second = await getEventClass('test:cached'); + + expect(first).toEqual(resolved); + expect(second).toEqual(resolved); + expect(mockEventsGetClass).toHaveBeenCalledTimes(1); + }); + + it('caches the null (undeclared) case', async () => { + mockEventsGetClass.mockResolvedValueOnce(null); + + const first = await getEventClass('test:never-declared'); + const second = await getEventClass('test:never-declared'); + + expect(first).toBeNull(); + expect(second).toBeNull(); + // Undeclared MUST also be cached — otherwise the hot path would + // keep paying IPC for events whose class will never be declared. + expect(mockEventsGetClass).toHaveBeenCalledTimes(1); + }); + + it('dedups in-flight concurrent lookups', async () => { + const resolved = makeResolved('test:concurrent'); + // Resolve the IPC promise on the next tick so two callers race. + mockEventsGetClass.mockImplementationOnce( + () => new Promise(resolve => setTimeout(() => resolve(resolved), 5)), + ); + + const [a, b] = await Promise.all([ + getEventClass('test:concurrent'), + getEventClass('test:concurrent'), + ]); + + expect(a).toEqual(resolved); + expect(b).toEqual(resolved); + // Both callers share ONE IPC round-trip. + expect(mockEventsGetClass).toHaveBeenCalledTimes(1); + }); +}); + +describe('EventClass — peekEventClassCache (sync hot path)', () => { + beforeEach(resetMocks); + + it('returns undefined when never looked up', () => { + expect(peekEventClassCache('test:cold')).toBeUndefined(); + }); + + it('returns the cached resolved config after declare', async () => { + const resolved = makeResolved('test:warm'); + mockEventsDeclareClass.mockResolvedValueOnce(resolved); + + await declareEventClass('test:warm', { broadcast: false, schemaVersion: 'v1' }); + + // Sync — no await on peek. This is the property the hot + // emit path relies on. + expect(peekEventClassCache('test:warm')).toEqual(resolved); + }); + + it('returns null when the cached lookup was undeclared', async () => { + mockEventsGetClass.mockResolvedValueOnce(null); + + await getEventClass('test:undecl-warm'); + + expect(peekEventClassCache('test:undecl-warm')).toBeNull(); + }); +}); + +describe('EventClass — listEventClasses', () => { + beforeEach(resetMocks); + + it('returns all classes + warms the cache for each', async () => { + const a = makeResolved('test:list-a'); + const b = makeResolved('test:list-b', true, 'global'); + mockEventsListClasses.mockResolvedValueOnce([a, b]); + + const list = await listEventClasses(); + + expect(list).toEqual([a, b]); + // After list, both classes are warm — emit hot path no longer + // pays IPC for them. + expect(peekEventClassCache('test:list-a')).toEqual(a); + expect(peekEventClassCache('test:list-b')).toEqual(b); + }); +}); + +describe('EventClass — resolveEventChannel', () => { + beforeEach(resetMocks); + + it('forwards to Rust IPC and returns the channel string', async () => { + mockEventsResolveChannel.mockResolvedValueOnce('global'); + + const channel = await resolveEventChannel('test:resolve-global', { foo: 'bar' }); + + expect(channel).toBe('global'); + expect(mockEventsResolveChannel).toHaveBeenCalledWith('test:resolve-global', { foo: 'bar' }); + }); + + it('propagates IPC errors (e.g. ByRoomId missing payload field)', async () => { + mockEventsResolveChannel.mockRejectedValueOnce( + new Error("event class 'chat:posted' requires field 'roomId' in payload"), + ); + + await expect( + resolveEventChannel('chat:posted', {}), + ).rejects.toThrow(/requires field 'roomId'/); + }); +}); diff --git a/src/tsconfig.eslint.json b/src/tsconfig.eslint.json index 9c09c97f8..551461c4b 100644 --- a/src/tsconfig.eslint.json +++ b/src/tsconfig.eslint.json @@ -26,7 +26,8 @@ "test-path-aliases-runtime.ts" ], "files": [ - "tests/unit/chat-coordination-stream.test.ts" + "tests/unit/chat-coordination-stream.test.ts", + "tests/unit/core/event-class-registry.test.ts" ], "exclude": [ "node_modules", diff --git a/src/workers/continuum-core/bindings/RustCoreIPC.ts b/src/workers/continuum-core/bindings/RustCoreIPC.ts index 9ca7b15c4..c5c77efd5 100644 --- a/src/workers/continuum-core/bindings/RustCoreIPC.ts +++ b/src/workers/continuum-core/bindings/RustCoreIPC.ts @@ -55,6 +55,7 @@ import { AIMixin } from './modules/ai'; import { EmbeddingMixin } from './modules/embedding'; import { RuntimeMixin } from './modules/runtime'; import { GpuMixin } from './modules/gpu'; +import { EventsMixin } from './modules/events'; import { SentinelMixin } from './modules/sentinel'; import { ToolParsingMixin } from './modules/tool_parsing'; import { SystemResourceMixin } from './modules/system_resources'; @@ -122,8 +123,9 @@ const ComposedClient = GridMixin(PlasticityMixin(VisionCacheMixin(DatasetMixin( SentinelMixin( InferenceMixin( SystemResourceMixin( - GpuMixin( - RuntimeMixin( + EventsMixin( + GpuMixin( + RuntimeMixin( EmbeddingMixin( AIMixin( ModelsMixin( @@ -150,7 +152,7 @@ const ComposedClient = GridMixin(PlasticityMixin(VisionCacheMixin(DatasetMixin( ) ) ) -)))); +))))); /** * Full RustCoreIPCClient with all domain methods. diff --git a/src/workers/continuum-core/bindings/modules/events.ts b/src/workers/continuum-core/bindings/modules/events.ts new file mode 100644 index 000000000..c3619a026 --- /dev/null +++ b/src/workers/continuum-core/bindings/modules/events.ts @@ -0,0 +1,132 @@ +/** + * RustCoreIPC Events Module — event-class declaration registry. + * + * Roadmap item L1-1 (see docs/grid/GRID-MIGRATION-ROADMAP.md). + * Spec: GRID-BUS-ARCHITECTURE §2.2 (continuum#1439). + * + * The Rust crate `events::` is the canonical store. This mixin is the + * thin SDK wrapper — the TS thin shim at src/system/events/shared/ + * EventClass.ts caches reads locally for the hot emit-path but only + * mutates through here. + * + * Native-truth-thin-SDK-per-language: the names + meanings of fields + * are owned by Rust; ts-rs generates the wire types under + * `shared/generated/events/`. Methods on this mixin are just typed + * IPC wrappers — no business logic. + */ + +import type { RustCoreIPCClientBase } from './base'; +import type { + EventClassConfig, + ResolvedEventClassConfig, +} from '../../../../shared/generated/events'; + +// ============================================================================ +// IPC params + result shapes +// ============================================================================ + +/** + * Params for `events/declare-class` — the class name + flattened + * `EventClassConfig` (broadcast / channel / schemaVersion / etc.). + * + * The Rust handler uses `#[serde(flatten)]` so the config fields live + * at the top level of the request alongside `name`. + */ +export interface EventsDeclareClassParams extends EventClassConfig { + name: string; +} + +export interface EventsResolveChannelResult { + channel: string; +} + +// ============================================================================ +// Mixin +// ============================================================================ + +export interface EventsMixin { + /** + * Register a new event class. Idempotent for identical re-declarations; + * throws on conflicting re-declarations (wire-contract integrity — + * silently shifting transport behavior between callers would mask bugs). + * + * Returns the canonical, post-validation form (with all defaults filled). + */ + eventsDeclareClass(params: EventsDeclareClassParams): Promise; + + /** + * Look up a single class's resolved config. Returns `null` when + * undeclared — callers fall back to default backward-compat behavior + * (local + WebSocket only, no airc broadcast). + */ + eventsGetClass(name: string): Promise; + + /** + * Snapshot of all declared classes. Used by the TS-side cache on + * startup + by `grid/show-event-classes` introspection. + */ + eventsListClasses(): Promise; + + /** + * Resolve the airc channel for an emit. Used by the L1-2 + * AircEventTransport when it lands. Throws if the class isn't + * declared, isn't `broadcast: true`, or its payload-dependent + * channel strategy can't find the required field + * (e.g. ByRoomId without `roomId` in payload). + */ + eventsResolveChannel(name: string, payload: Record): Promise; +} + +// Mixin generic constraint mirrors the pattern in sibling mixins +// (GpuMixin, CognitionMixin, DatasetMixin). `any[]` is the only constructor +// signature TypeScript's mixin pattern accepts — `unknown[]` would reject +// subclass constructors with concrete arg types. +/* eslint-disable @typescript-eslint/no-explicit-any */ +export function EventsMixin RustCoreIPCClientBase>( + Base: T, +): T & (new (...args: any[]) => EventsMixin) { + return class extends Base implements EventsMixin { + async eventsDeclareClass(params: EventsDeclareClassParams): Promise { + const response = await this.request({ + command: 'events/declare-class', + ...params, + }); + if (!response.success) { + throw new Error(response.error ?? `events/declare-class failed for '${params.name}'`); + } + return response.result as ResolvedEventClassConfig; + } + + async eventsGetClass(name: string): Promise { + const response = await this.request({ command: 'events/get-class', name }); + if (!response.success) { + throw new Error(response.error ?? `events/get-class failed for '${name}'`); + } + // Rust returns JSON null when undeclared — surface as TS null, + // not undefined, so callers can distinguish "not declared" from + // "didn't ask yet." + return (response.result as ResolvedEventClassConfig | null) ?? null; + } + + async eventsListClasses(): Promise { + const response = await this.request({ command: 'events/list-classes' }); + if (!response.success) { + throw new Error(response.error ?? 'events/list-classes failed'); + } + return response.result as ResolvedEventClassConfig[]; + } + + async eventsResolveChannel(name: string, payload: Record): Promise { + const response = await this.request({ + command: 'events/resolve-channel', + name, + payload, + }); + if (!response.success) { + throw new Error(response.error ?? `events/resolve-channel failed for '${name}'`); + } + return (response.result as EventsResolveChannelResult).channel; + } + }; +} +/* eslint-enable @typescript-eslint/no-explicit-any */ diff --git a/src/workers/continuum-core/bindings/modules/index.ts b/src/workers/continuum-core/bindings/modules/index.ts index 172cff87a..e3f251826 100644 --- a/src/workers/continuum-core/bindings/modules/index.ts +++ b/src/workers/continuum-core/bindings/modules/index.ts @@ -52,6 +52,9 @@ export type { VisionCacheMixin as VisionCacheMixinInterface, VisionCacheEntry, V export { PlasticityMixin } from './plasticity'; export type { PlasticityMixin as PlasticityMixinInterface, PlasticityAnalyzeParams, PlasticityCompactParams, PlasticityTopologyParams } from './plasticity'; +export { EventsMixin } from './events'; +export type { EventsMixin as EventsMixinInterface, EventsDeclareClassParams, EventsResolveChannelResult } from './events'; + /** * Compose all mixins into a single client class. * Usage: const Client = composeClient(RustCoreIPCClientBase); diff --git a/src/workers/continuum-core/src/events/event_class.rs b/src/workers/continuum-core/src/events/event_class.rs new file mode 100644 index 000000000..cb1fbb2d2 --- /dev/null +++ b/src/workers/continuum-core/src/events/event_class.rs @@ -0,0 +1,302 @@ +//! EventClassConfig + validation. Pure types; no I/O, no registry mutation. +//! +//! Roadmap item L1-1 (see docs/grid/GRID-MIGRATION-ROADMAP.md). +//! Spec: GRID-BUS-ARCHITECTURE §2.2 (continuum#1439). +//! +//! ts-rs generates the TS bindings at `shared/generated/events/`. + +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use ts_rs::TS; + +/// Channel-strategy for an event class — how the event-name maps to an airc +/// channel when `broadcast: true`. The transport consults this at emit time. +/// +/// - `Local` — no broadcast (paired with `broadcast: false`). +/// - `Global` — mesh-wide single channel (e.g. `#presence`). +/// - `ByRoomId` — event payload must carry `roomId`; routed to that +/// room's airc channel. +/// - `ByPeerId` — event payload must carry `peerId`; routed to a +/// peer-targeted channel (DM-like). +/// - `Custom` — caller-supplied channel resolver runs at emit time. +/// (The resolver itself can't cross the wire — it's a per-process +/// function ref — so on the TS side the resolver is registered +/// separately from the Rust-canonical config.) +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export, export_to = "../../../shared/generated/events/EventClassChannelStrategy.ts")] +pub enum EventClassChannelStrategy { + Local, + Global, + ByRoomId, + ByPeerId, + Custom, +} + +/// Behavior when a subscriber receives an event with a `schemaVersion` +/// it doesn't recognize. Default `Fail` matches the standing project rule +/// of never silently swallowing evidence. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export, export_to = "../../../shared/generated/events/EventClassUnknownSchemaPolicy.ts")] +pub enum EventClassUnknownSchemaPolicy { + Warn, + #[default] + Fail, +} + +/// Caller-supplied event-class declaration. All optional fields fill with +/// conservative defaults (no broadcast, no airc cost). +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export, export_to = "../../../shared/generated/events/EventClassConfig.ts")] +pub struct EventClassConfig { + /// Distribute this event class through the airc transport in addition + /// to the local + WebSocket transports? + /// + /// `false` (default) — local + WebSocket only. Zero airc cost. + /// `true` — also durable on the airc log; reaches cross-machine + /// subscribers via the AircEventTransport (L1-2). + #[serde(default)] + pub broadcast: bool, + + /// How the event-name + payload map to an airc channel when broadcast + /// is `true`. Defaults to `Local` when `broadcast: false`, otherwise + /// required (validation throws on missing-when-broadcast). + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub channel: Option, + + /// Wire-format schema version. Subscribers fail loud on unknown + /// versions per `on_unknown_schema`. Bump when the payload shape + /// changes incompatibly. + pub schema_version: String, + + /// Action when a subscriber receives an event whose declared + /// `schemaVersion` doesn't match its build. Default `Fail`. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub on_unknown_schema: Option, + + /// Optional human-readable description for `grid/show-event-classes` + /// and similar introspection. Not load-bearing at runtime. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub description: Option, +} + +/// Canonical, post-validation form of an event-class declaration. +/// What the registry stores + what the TS side caches. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export, export_to = "../../../shared/generated/events/ResolvedEventClassConfig.ts")] +pub struct ResolvedEventClassConfig { + pub name: String, + pub broadcast: bool, + pub channel: EventClassChannelStrategy, + pub schema_version: String, + pub on_unknown_schema: EventClassUnknownSchemaPolicy, + pub description: String, +} + +/// Validation errors raised when resolving an `EventClassConfig`. Each +/// variant carries the event-class name so a multi-class declaration +/// sweep can report which one failed. +#[derive(Debug, Error)] +pub enum EventClassDeclareError { + #[error("EventClass name is required (non-empty string)")] + EmptyName, + + #[error("EventClass '{name}': schemaVersion is required (non-empty)")] + EmptySchemaVersion { name: String }, + + #[error( + "EventClass '{name}': broadcast: true requires an explicit non-local channel \ + (Global | ByRoomId | ByPeerId | Custom)" + )] + BroadcastWithoutChannel { name: String }, + + #[error( + "EventClass '{name}': channel: {channel:?} implies broadcast intent — \ + set broadcast: true OR drop the channel field" + )] + ChannelWithoutBroadcast { + name: String, + channel: EventClassChannelStrategy, + }, + + #[error( + "EventClass '{name}' already declared with a conflicting config. \ + Event-class declarations are wire contracts; conflicting declarations \ + would silently shift transport behavior between callers. \ + If the config needs to change, bump schemaVersion + update subscribers." + )] + ConflictingRedeclaration { name: String }, +} + +/// Resolve user-supplied config into the canonical internal form (fills +/// defaults, validates internal consistency). +pub fn resolve_event_class_config( + name: &str, + config: &EventClassConfig, +) -> Result { + if name.trim().is_empty() { + return Err(EventClassDeclareError::EmptyName); + } + if config.schema_version.trim().is_empty() { + return Err(EventClassDeclareError::EmptySchemaVersion { + name: name.to_string(), + }); + } + + let broadcast = config.broadcast; + let channel = config + .channel + .unwrap_or(if broadcast { + // Will fail validation below — broadcast requires explicit channel. + EventClassChannelStrategy::Local + } else { + EventClassChannelStrategy::Local + }); + + if broadcast && channel == EventClassChannelStrategy::Local { + return Err(EventClassDeclareError::BroadcastWithoutChannel { + name: name.to_string(), + }); + } + if !broadcast && channel != EventClassChannelStrategy::Local { + return Err(EventClassDeclareError::ChannelWithoutBroadcast { + name: name.to_string(), + channel, + }); + } + + Ok(ResolvedEventClassConfig { + name: name.to_string(), + broadcast, + channel, + schema_version: config.schema_version.clone(), + on_unknown_schema: config.on_unknown_schema.unwrap_or_default(), + description: config.description.clone().unwrap_or_default(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn cfg_minimal_local() -> EventClassConfig { + EventClassConfig { + broadcast: false, + channel: None, + schema_version: "v1".into(), + on_unknown_schema: None, + description: None, + } + } + + fn cfg_broadcast_global() -> EventClassConfig { + EventClassConfig { + broadcast: true, + channel: Some(EventClassChannelStrategy::Global), + schema_version: "v1".into(), + on_unknown_schema: None, + description: None, + } + } + + #[test] + fn resolves_local_default() { + let r = resolve_event_class_config("widget:mounted", &cfg_minimal_local()).unwrap(); + assert_eq!(r.name, "widget:mounted"); + assert!(!r.broadcast); + assert_eq!(r.channel, EventClassChannelStrategy::Local); + assert_eq!(r.schema_version, "v1"); + assert_eq!(r.on_unknown_schema, EventClassUnknownSchemaPolicy::Fail); + } + + #[test] + fn resolves_broadcast_global() { + let r = resolve_event_class_config("presence:peer-manifest", &cfg_broadcast_global()) + .unwrap(); + assert!(r.broadcast); + assert_eq!(r.channel, EventClassChannelStrategy::Global); + } + + #[test] + fn rejects_empty_name() { + let err = resolve_event_class_config("", &cfg_minimal_local()).unwrap_err(); + assert!(matches!(err, EventClassDeclareError::EmptyName)); + } + + #[test] + fn rejects_empty_schema_version() { + let bad = EventClassConfig { + schema_version: "".into(), + ..cfg_minimal_local() + }; + let err = resolve_event_class_config("foo:bar", &bad).unwrap_err(); + assert!(matches!( + err, + EventClassDeclareError::EmptySchemaVersion { .. } + )); + } + + #[test] + fn rejects_broadcast_without_channel() { + let bad = EventClassConfig { + broadcast: true, + channel: None, + ..cfg_minimal_local() + }; + let err = resolve_event_class_config("chat:posted", &bad).unwrap_err(); + assert!(matches!( + err, + EventClassDeclareError::BroadcastWithoutChannel { .. } + )); + } + + #[test] + fn rejects_broadcast_with_local_channel() { + let bad = EventClassConfig { + broadcast: true, + channel: Some(EventClassChannelStrategy::Local), + ..cfg_minimal_local() + }; + let err = resolve_event_class_config("chat:posted", &bad).unwrap_err(); + assert!(matches!( + err, + EventClassDeclareError::BroadcastWithoutChannel { .. } + )); + } + + #[test] + fn rejects_channel_without_broadcast() { + let bad = EventClassConfig { + broadcast: false, + channel: Some(EventClassChannelStrategy::Global), + ..cfg_minimal_local() + }; + let err = resolve_event_class_config("chat:posted", &bad).unwrap_err(); + assert!(matches!( + err, + EventClassDeclareError::ChannelWithoutBroadcast { .. } + )); + } + + #[test] + fn defaults_on_unknown_schema_to_fail() { + let r = resolve_event_class_config("foo:bar", &cfg_minimal_local()).unwrap(); + assert_eq!(r.on_unknown_schema, EventClassUnknownSchemaPolicy::Fail); + } + + #[test] + fn honors_explicit_on_unknown_schema_warn() { + let cfg = EventClassConfig { + on_unknown_schema: Some(EventClassUnknownSchemaPolicy::Warn), + ..cfg_minimal_local() + }; + let r = resolve_event_class_config("foo:bar", &cfg).unwrap(); + assert_eq!(r.on_unknown_schema, EventClassUnknownSchemaPolicy::Warn); + } +} diff --git a/src/workers/continuum-core/src/events/event_class_registry.rs b/src/workers/continuum-core/src/events/event_class_registry.rs new file mode 100644 index 000000000..f1bfc6de8 --- /dev/null +++ b/src/workers/continuum-core/src/events/event_class_registry.rs @@ -0,0 +1,415 @@ +//! EventClassRegistry — process-global, thread-safe registry of declared +//! event classes. +//! +//! Roadmap item L1-1 (see docs/grid/GRID-MIGRATION-ROADMAP.md). +//! Spec: GRID-BUS-ARCHITECTURE §2.2 (continuum#1439). +//! +//! Module-singleton holding `name → ResolvedEventClassConfig`. Consulted by: +//! - The IPC handler in `crate::modules::events` for declare/get/list +//! - Future AircEventTransport (L1-2) for channel resolution +//! - The TS-side cache, which hydrates via IPC on startup +//! +//! Registration is idempotent for identical re-declarations; conflicting +//! re-declarations throw — event classes are wire contracts. + +use crate::events::event_class::{ + resolve_event_class_config, EventClassChannelStrategy, EventClassConfig, + EventClassDeclareError, ResolvedEventClassConfig, +}; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::OnceLock; +use thiserror::Error; + +/// Errors raised when registering a class via the registry. Validation +/// errors from `resolve_event_class_config` are wrapped; the conflicting- +/// redeclaration check is registry-side. +#[derive(Debug, Error)] +pub enum EventClassRegistryError { + #[error(transparent)] + Declare(#[from] EventClassDeclareError), +} + +/// Errors raised when resolving the airc channel for an event emission. +/// Happens at emit time (L1-2+), not at declare time. +#[derive(Debug, Error)] +pub enum EventClassChannelResolveError { + #[error("EventClass '{0}' is not declared")] + Undeclared(String), + + #[error("EventClass '{0}': declared with broadcast: false; airc channel resolution skipped")] + NotBroadcast(String), + + #[error( + "EventClass '{name}': channel: {channel:?} requires payload.{required_field} to be present and non-empty" + )] + MissingPayloadField { + name: String, + channel: EventClassChannelStrategy, + required_field: &'static str, + }, + + #[error( + "EventClass '{name}': channel: Custom requires a process-local resolver — \ + declared via Rust IPC but no Rust-side resolver wired. (TS-side custom \ + resolvers run in the TS process; the Rust registry only records the channel \ + strategy.)" + )] + CustomResolverUnsupported { name: String }, +} + +#[derive(Debug, Clone)] +struct RegistryEntry { + config: ResolvedEventClassConfig, + /// Canonical form used for idempotent-re-declaration check. + canonical: String, +} + +pub struct EventClassRegistry { + classes: RwLock>, +} + +impl EventClassRegistry { + pub fn new() -> Self { + Self { + classes: RwLock::new(HashMap::new()), + } + } + + /// Declare an event class. Idempotent for identical re-declarations; + /// raises `ConflictingRedeclaration` on a name collision with different + /// config (per the wire-contract integrity invariant). + pub fn declare( + &self, + name: &str, + config: &EventClassConfig, + ) -> Result { + let resolved = resolve_event_class_config(name, config)?; + let canonical = canonicalize(&resolved); + + let mut classes = self.classes.write(); + if let Some(existing) = classes.get(name) { + if existing.canonical != canonical { + return Err(EventClassRegistryError::Declare( + EventClassDeclareError::ConflictingRedeclaration { + name: name.to_string(), + }, + )); + } + return Ok(existing.config.clone()); + } + classes.insert( + name.to_string(), + RegistryEntry { + config: resolved.clone(), + canonical, + }, + ); + Ok(resolved) + } + + /// Look up the resolved config for an event name. Returns `None` when + /// no class is declared — caller treats this as "use default backward- + /// compat behavior" (local + WebSocket EventBridge, no airc broadcast). + pub fn get(&self, name: &str) -> Option { + self.classes.read().get(name).map(|e| e.config.clone()) + } + + /// Snapshot of all declared classes. Order is unspecified — caller + /// sorts if needed (e.g. for stable introspection output). + pub fn list(&self) -> Vec { + self.classes + .read() + .values() + .map(|e| e.config.clone()) + .collect() + } + + /// Resolve the airc channel name for an emit, given the event name + + /// the event payload (as a serde_json::Value so the registry doesn't + /// need a per-class type). + /// + /// `Custom` channel strategy is unsupported at the Rust-canonical + /// layer — custom resolvers are process-local functions that can't + /// cross the wire; the TS side handles its own custom resolvers in- + /// process, then submits the resolved channel via a different IPC if + /// it needs Rust to know the result. + pub fn resolve_channel( + &self, + name: &str, + payload: &serde_json::Value, + ) -> Result { + let entry = self + .classes + .read() + .get(name) + .cloned() + .ok_or_else(|| EventClassChannelResolveError::Undeclared(name.to_string()))?; + if !entry.config.broadcast { + return Err(EventClassChannelResolveError::NotBroadcast(name.to_string())); + } + match entry.config.channel { + EventClassChannelStrategy::Global => Ok("global".to_string()), + EventClassChannelStrategy::ByRoomId => { + extract_string_field(payload, "roomId").ok_or_else(|| { + EventClassChannelResolveError::MissingPayloadField { + name: name.to_string(), + channel: EventClassChannelStrategy::ByRoomId, + required_field: "roomId", + } + }) + } + EventClassChannelStrategy::ByPeerId => { + extract_string_field(payload, "peerId").ok_or_else(|| { + EventClassChannelResolveError::MissingPayloadField { + name: name.to_string(), + channel: EventClassChannelStrategy::ByPeerId, + required_field: "peerId", + } + }) + } + EventClassChannelStrategy::Custom => { + Err(EventClassChannelResolveError::CustomResolverUnsupported { + name: name.to_string(), + }) + } + EventClassChannelStrategy::Local => Err(EventClassChannelResolveError::NotBroadcast( + name.to_string(), + )), + } + } + + /// Test-only — clears all declarations. Production code never calls this. + #[cfg(test)] + pub fn clear(&self) { + self.classes.write().clear(); + } +} + +impl Default for EventClassRegistry { + fn default() -> Self { + Self::new() + } +} + +/// Process-global registry singleton. Initialized lazily on first access. +fn registry_singleton() -> &'static EventClassRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(EventClassRegistry::new) +} + +/// Module-level accessor for the process-global registry. Returns a +/// reference rather than a clone — the registry is `RwLock`-internally +/// synchronized. +pub fn event_class_registry() -> &'static EventClassRegistry { + registry_singleton() +} + +/// Convenience wrapper for the singleton's `declare`. Mirrors the +/// JavaScript-side `declareEventClass()` helper. +pub fn declare_event_class( + name: &str, + config: &EventClassConfig, +) -> Result { + registry_singleton().declare(name, config) +} + +/// Convenience wrapper for the singleton's `get`. +pub fn lookup_event_class(name: &str) -> Option { + registry_singleton().get(name) +} + +/// Convenience wrapper for the singleton's `list`. +pub fn list_event_classes() -> Vec { + registry_singleton().list() +} + +/// Convenience wrapper for the singleton's `resolve_channel`. +pub fn resolve_event_class_channel( + name: &str, + payload: &serde_json::Value, +) -> Result { + registry_singleton().resolve_channel(name, payload) +} + +// ─── Helpers ────────────────────────────────────────────────────────── + +fn canonicalize(c: &ResolvedEventClassConfig) -> String { + // Stable canonical form for the idempotent-redeclaration check. + // Excludes `name` (it's the registry key) and `description` (free + // text; not load-bearing for the contract). + serde_json::json!({ + "broadcast": c.broadcast, + "channel": c.channel, + "schemaVersion": c.schema_version, + "onUnknownSchema": c.on_unknown_schema, + }) + .to_string() +} + +fn extract_string_field(payload: &serde_json::Value, field: &str) -> Option { + payload + .as_object()? + .get(field)? + .as_str() + .filter(|s| !s.is_empty()) + .map(str::to_string) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn local_cfg() -> EventClassConfig { + EventClassConfig { + broadcast: false, + channel: None, + schema_version: "v1".into(), + on_unknown_schema: None, + description: None, + } + } + + fn broadcast_global_cfg() -> EventClassConfig { + EventClassConfig { + broadcast: true, + channel: Some(EventClassChannelStrategy::Global), + schema_version: "v1".into(), + on_unknown_schema: None, + description: Some("test class".into()), + } + } + + fn broadcast_by_room_cfg() -> EventClassConfig { + EventClassConfig { + broadcast: true, + channel: Some(EventClassChannelStrategy::ByRoomId), + schema_version: "v1".into(), + on_unknown_schema: None, + description: None, + } + } + + #[test] + fn declare_get_roundtrip() { + let r = EventClassRegistry::new(); + let resolved = r.declare("chat:posted", &broadcast_global_cfg()).unwrap(); + assert!(resolved.broadcast); + + let fetched = r.get("chat:posted").unwrap(); + assert_eq!(fetched.name, "chat:posted"); + assert_eq!(fetched.channel, EventClassChannelStrategy::Global); + assert_eq!(fetched.schema_version, "v1"); + assert_eq!(fetched.description, "test class"); + } + + #[test] + fn get_undeclared_returns_none() { + let r = EventClassRegistry::new(); + assert!(r.get("never:declared").is_none()); + } + + #[test] + fn idempotent_redeclaration_succeeds() { + let r = EventClassRegistry::new(); + let a = r.declare("foo:bar", &local_cfg()).unwrap(); + let b = r.declare("foo:bar", &local_cfg()).unwrap(); + assert_eq!(a, b); + // Only one entry in the list. + assert_eq!(r.list().len(), 1); + } + + #[test] + fn conflicting_redeclaration_errors() { + let r = EventClassRegistry::new(); + r.declare("foo:bar", &local_cfg()).unwrap(); + let conflict = EventClassConfig { + broadcast: true, + channel: Some(EventClassChannelStrategy::Global), + schema_version: "v2".into(), + on_unknown_schema: None, + description: None, + }; + let err = r.declare("foo:bar", &conflict).unwrap_err(); + assert!(matches!( + err, + EventClassRegistryError::Declare(EventClassDeclareError::ConflictingRedeclaration { .. }) + )); + } + + #[test] + fn list_returns_all_declared() { + let r = EventClassRegistry::new(); + r.declare("a:b", &local_cfg()).unwrap(); + r.declare("c:d", &broadcast_global_cfg()).unwrap(); + let mut names: Vec = r.list().iter().map(|c| c.name.clone()).collect(); + names.sort(); + assert_eq!(names, vec!["a:b", "c:d"]); + } + + #[test] + fn resolve_channel_global() { + let r = EventClassRegistry::new(); + r.declare("presence:peer-manifest", &broadcast_global_cfg()) + .unwrap(); + let ch = r + .resolve_channel("presence:peer-manifest", &serde_json::json!({})) + .unwrap(); + assert_eq!(ch, "global"); + } + + #[test] + fn resolve_channel_by_room_id() { + let r = EventClassRegistry::new(); + r.declare("chat:posted", &broadcast_by_room_cfg()).unwrap(); + let ch = r + .resolve_channel( + "chat:posted", + &serde_json::json!({ "roomId": "room-abc-123" }), + ) + .unwrap(); + assert_eq!(ch, "room-abc-123"); + } + + #[test] + fn resolve_channel_by_room_id_missing_field() { + let r = EventClassRegistry::new(); + r.declare("chat:posted", &broadcast_by_room_cfg()).unwrap(); + let err = r + .resolve_channel("chat:posted", &serde_json::json!({})) + .unwrap_err(); + assert!(matches!( + err, + EventClassChannelResolveError::MissingPayloadField { required_field: "roomId", .. } + )); + } + + #[test] + fn resolve_channel_undeclared() { + let r = EventClassRegistry::new(); + let err = r + .resolve_channel("never:declared", &serde_json::json!({})) + .unwrap_err(); + assert!(matches!(err, EventClassChannelResolveError::Undeclared(_))); + } + + #[test] + fn resolve_channel_not_broadcast() { + let r = EventClassRegistry::new(); + r.declare("widget:mounted", &local_cfg()).unwrap(); + let err = r + .resolve_channel("widget:mounted", &serde_json::json!({})) + .unwrap_err(); + assert!(matches!(err, EventClassChannelResolveError::NotBroadcast(_))); + } + + #[test] + fn singleton_persists_across_calls() { + // Use a unique-per-test name so we don't conflict with other tests + // sharing the singleton. + let name = "singleton:persists"; + declare_event_class(name, &local_cfg()).unwrap(); + let fetched = lookup_event_class(name).unwrap(); + assert_eq!(fetched.name, name); + } +} diff --git a/src/workers/continuum-core/src/events/mod.rs b/src/workers/continuum-core/src/events/mod.rs new file mode 100644 index 000000000..5d35fd9c6 --- /dev/null +++ b/src/workers/continuum-core/src/events/mod.rs @@ -0,0 +1,25 @@ +//! Event-class registry — the Rust-truth layer for cross-environment +//! event metadata that decides which transport tier carries each event. +//! +//! Roadmap item L1-1 (see docs/grid/GRID-MIGRATION-ROADMAP.md). +//! Spec: GRID-BUS-ARCHITECTURE §2.2 + §6.2 (continuum#1439). +//! +//! Continuum-side TS reads through the IPC binding (`bindings/modules/events.ts`) +//! and the thin shim at `src/system/events/shared/EventClass.ts`. Per the +//! native-truth-thin-SDK-per-language pattern, this module is the single +//! canonical source of EventClass declarations + lookups; the TS side +//! caches reads locally for the hot emit-path but never mutates without +//! going through the IPC. + +pub mod event_class; +pub mod event_class_registry; + +pub use event_class::{ + resolve_event_class_config, EventClassChannelStrategy, EventClassConfig, + EventClassDeclareError, EventClassUnknownSchemaPolicy, ResolvedEventClassConfig, +}; +pub use event_class_registry::{ + declare_event_class, event_class_registry, list_event_classes, lookup_event_class, + resolve_event_class_channel, EventClassChannelResolveError, EventClassRegistry, + EventClassRegistryError, +}; diff --git a/src/workers/continuum-core/src/ipc/mod.rs b/src/workers/continuum-core/src/ipc/mod.rs index 73f029a68..1c49ed775 100644 --- a/src/workers/continuum-core/src/ipc/mod.rs +++ b/src/workers/continuum-core/src/ipc/mod.rs @@ -11,6 +11,7 @@ use crate::modules::cognition::{CognitionModule, CognitionState}; use crate::modules::data::DataModule; use crate::modules::dataset::DatasetModule; use crate::modules::embedding::EmbeddingModule; +use crate::modules::events::EventsModule; use crate::modules::forge::ForgeModule; use crate::modules::gpu::GpuModule; use crate::modules::grid::GridModule; @@ -730,6 +731,14 @@ pub fn start_server( // real foundry executor. runtime.register(Arc::new(ForgeModule::new())); + // EventsModule (L1-1 — event-class declaration registry). + // Spec: GRID-BUS-ARCHITECTURE §2.2 (continuum#1439). + // Exposes events/declare-class, events/get-class, events/list-classes, + // events/resolve-channel. The TS thin shim at src/system/events/shared/ + // EventClass.ts reads through this; the L1-2 AircEventTransport will + // consult resolve-channel at emit time. + runtime.register(Arc::new(EventsModule::new())); + // Phase 1: PersonaAllocatorModule (hardware-aware persona allocation) runtime.register(Arc::new(PersonaAllocatorModule::new(gpu_manager.clone()))); diff --git a/src/workers/continuum-core/src/lib.rs b/src/workers/continuum-core/src/lib.rs index dca34fda6..41b570a71 100644 --- a/src/workers/continuum-core/src/lib.rs +++ b/src/workers/continuum-core/src/lib.rs @@ -23,6 +23,7 @@ pub mod code; pub mod comms; pub mod cognition; pub mod concurrency; +pub mod events; pub mod ffi; pub mod forge; pub mod governor; diff --git a/src/workers/continuum-core/src/modules/events.rs b/src/workers/continuum-core/src/modules/events.rs new file mode 100644 index 000000000..bdc547923 --- /dev/null +++ b/src/workers/continuum-core/src/modules/events.rs @@ -0,0 +1,298 @@ +//! EventsModule — IPC commands for the event-class registry. +//! +//! Roadmap item L1-1 (see docs/grid/GRID-MIGRATION-ROADMAP.md). +//! Spec: GRID-BUS-ARCHITECTURE §2.2 (continuum#1439). +//! +//! Commands: +//! - `events/declare-class`: Register a new event class with transport-routing +//! metadata. Idempotent for identical re-declarations; errors on conflicting +//! re-declarations (wire-contract integrity). +//! - `events/get-class`: Look up a single class's resolved config. Returns +//! null when undeclared (caller falls back to default backward-compat +//! behavior). +//! - `events/list-classes`: Snapshot of all declared classes. Used by the +//! TS-side cache on startup + by `grid/show-event-classes` introspection. +//! - `events/resolve-channel`: Resolve the airc channel for an emit. Used +//! by the L1-2 AircEventTransport when it lands. + +use crate::events::{ + declare_event_class, list_event_classes, lookup_event_class, resolve_event_class_channel, + EventClassChannelResolveError, EventClassConfig, EventClassRegistryError, +}; +use crate::runtime::{CommandResult, ModuleConfig, ModuleContext, ModulePriority, ServiceModule}; +use async_trait::async_trait; +use serde::Deserialize; +use serde_json::Value; +use std::any::Any; + +pub struct EventsModule; + +impl EventsModule { + pub fn new() -> Self { + Self + } +} + +impl Default for EventsModule { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Deserialize)] +struct DeclareClassParams { + name: String, + #[serde(flatten)] + config: EventClassConfig, +} + +#[derive(Debug, Deserialize)] +struct GetClassParams { + name: String, +} + +#[derive(Debug, Deserialize)] +struct ResolveChannelParams { + name: String, + /// Event payload. Channel strategies that depend on payload fields + /// (ByRoomId, ByPeerId) extract from this. + #[serde(default)] + payload: Value, +} + +#[async_trait] +impl ServiceModule for EventsModule { + fn config(&self) -> ModuleConfig { + ModuleConfig { + name: "events", + priority: ModulePriority::Normal, + command_prefixes: &["events/"], + event_subscriptions: &[], + needs_dedicated_thread: false, + max_concurrency: 0, + tick_interval: None, + } + } + + async fn initialize(&self, _ctx: &ModuleContext) -> Result<(), String> { + Ok(()) + } + + async fn handle_command(&self, command: &str, params: Value) -> Result { + match command { + "events/declare-class" => { + let parsed: DeclareClassParams = serde_json::from_value(params) + .map_err(|e| format!("events/declare-class: invalid params: {e}"))?; + let resolved = declare_event_class(&parsed.name, &parsed.config) + .map_err(declare_error_to_string)?; + let json = serde_json::to_value(&resolved) + .map_err(|e| format!("events/declare-class: serialize result: {e}"))?; + Ok(CommandResult::Json(json)) + } + + "events/get-class" => { + let parsed: GetClassParams = serde_json::from_value(params) + .map_err(|e| format!("events/get-class: invalid params: {e}"))?; + match lookup_event_class(&parsed.name) { + Some(cfg) => { + let json = serde_json::to_value(&cfg) + .map_err(|e| format!("events/get-class: serialize result: {e}"))?; + Ok(CommandResult::Json(json)) + } + // Return JSON null — caller treats as "no class declared, + // use default backward-compat behavior." + None => Ok(CommandResult::Json(Value::Null)), + } + } + + "events/list-classes" => { + let classes = list_event_classes(); + let json = serde_json::to_value(&classes) + .map_err(|e| format!("events/list-classes: serialize result: {e}"))?; + Ok(CommandResult::Json(json)) + } + + "events/resolve-channel" => { + let parsed: ResolveChannelParams = serde_json::from_value(params) + .map_err(|e| format!("events/resolve-channel: invalid params: {e}"))?; + match resolve_event_class_channel(&parsed.name, &parsed.payload) { + Ok(channel) => Ok(CommandResult::Json(serde_json::json!({ + "channel": channel, + }))), + Err(e) => Err(resolve_error_to_string(e)), + } + } + + other => Err(format!("Unknown events command: {other}")), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +fn declare_error_to_string(e: EventClassRegistryError) -> String { + match e { + EventClassRegistryError::Declare(inner) => format!("events/declare-class: {inner}"), + } +} + +fn resolve_error_to_string(e: EventClassChannelResolveError) -> String { + format!("events/resolve-channel: {e}") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::events::EventClassChannelStrategy; + + fn declare_params_local(name: &str) -> Value { + serde_json::json!({ + "name": name, + "broadcast": false, + "schemaVersion": "v1", + }) + } + + fn declare_params_broadcast_global(name: &str) -> Value { + serde_json::json!({ + "name": name, + "broadcast": true, + "channel": "global", + "schemaVersion": "v1", + }) + } + + #[tokio::test] + async fn declare_then_get_via_ipc() { + let module = EventsModule::new(); + // Use unique-per-test names to avoid cross-test contamination of + // the singleton. + let name = "ipc-test:declare-then-get"; + + let result = module + .handle_command("events/declare-class", declare_params_broadcast_global(name)) + .await + .unwrap(); + match result { + CommandResult::Json(v) => { + assert_eq!(v.get("name").and_then(|x| x.as_str()), Some(name)); + assert_eq!(v.get("broadcast").and_then(|x| x.as_bool()), Some(true)); + assert_eq!(v.get("channel").and_then(|x| x.as_str()), Some("global")); + } + _ => panic!("expected json result"), + } + + let result = module + .handle_command( + "events/get-class", + serde_json::json!({ "name": name }), + ) + .await + .unwrap(); + match result { + CommandResult::Json(v) => { + assert_eq!(v.get("name").and_then(|x| x.as_str()), Some(name)); + } + _ => panic!("expected json result"), + } + } + + #[tokio::test] + async fn get_undeclared_returns_null() { + let module = EventsModule::new(); + let result = module + .handle_command( + "events/get-class", + serde_json::json!({ "name": "never:declared-by-ipc-test" }), + ) + .await + .unwrap(); + match result { + CommandResult::Json(Value::Null) => {} + other => panic!("expected null, got {other:?}"), + } + } + + #[tokio::test] + async fn declare_idempotent() { + let module = EventsModule::new(); + let name = "ipc-test:idempotent"; + + let first = module + .handle_command("events/declare-class", declare_params_local(name)) + .await + .unwrap(); + let second = module + .handle_command("events/declare-class", declare_params_local(name)) + .await + .unwrap(); + match (first, second) { + (CommandResult::Json(a), CommandResult::Json(b)) => assert_eq!(a, b), + _ => panic!("expected json results"), + } + } + + #[tokio::test] + async fn resolve_channel_global_via_ipc() { + let module = EventsModule::new(); + let name = "ipc-test:resolve-global"; + module + .handle_command("events/declare-class", declare_params_broadcast_global(name)) + .await + .unwrap(); + + let result = module + .handle_command( + "events/resolve-channel", + serde_json::json!({ "name": name, "payload": {} }), + ) + .await + .unwrap(); + match result { + CommandResult::Json(v) => { + assert_eq!(v.get("channel").and_then(|x| x.as_str()), Some("global")); + } + _ => panic!("expected json result"), + } + } + + #[tokio::test] + async fn list_classes_includes_declared() { + let module = EventsModule::new(); + // Use a uniquely-prefixed name so we can find it in the global + // list even if other tests declared others. + let name = "ipc-test:list-check-unique-name-xyz"; + module + .handle_command("events/declare-class", declare_params_local(name)) + .await + .unwrap(); + + let result = module + .handle_command("events/list-classes", serde_json::json!({})) + .await + .unwrap(); + match result { + CommandResult::Json(v) => { + let arr = v.as_array().expect("list returns array"); + let found = arr.iter().any(|c| { + c.get("name").and_then(|n| n.as_str()) == Some(name) + }); + assert!(found, "declared class should appear in list"); + } + _ => panic!("expected json array"), + } + } + + // Smoke that the channel-strategy enum serializes the way the TS side expects. + #[test] + fn channel_strategy_serializes_camel_case() { + let global = EventClassChannelStrategy::Global; + let by_room = EventClassChannelStrategy::ByRoomId; + let by_peer = EventClassChannelStrategy::ByPeerId; + assert_eq!(serde_json::to_string(&global).unwrap(), "\"global\""); + assert_eq!(serde_json::to_string(&by_room).unwrap(), "\"byRoomId\""); + assert_eq!(serde_json::to_string(&by_peer).unwrap(), "\"byPeerId\""); + } +} diff --git a/src/workers/continuum-core/src/modules/mod.rs b/src/workers/continuum-core/src/modules/mod.rs index 23d55085c..1eacd45dc 100644 --- a/src/workers/continuum-core/src/modules/mod.rs +++ b/src/workers/continuum-core/src/modules/mod.rs @@ -22,6 +22,7 @@ pub mod docker_tier; pub mod docker_tier_pool; pub mod embedding; pub mod entity_schemas; +pub mod events; pub mod forge; pub mod gpu; pub mod grid;