diff --git a/src/docs/architecture/ALPHA-GAP-RUST-PERSONA-RUNTIME.md b/src/docs/architecture/ALPHA-GAP-RUST-PERSONA-RUNTIME.md new file mode 100644 index 000000000..ac706ddfc --- /dev/null +++ b/src/docs/architecture/ALPHA-GAP-RUST-PERSONA-RUNTIME.md @@ -0,0 +1,451 @@ +# Alpha Gap: Rust Persona Runtime + +## Status + +Continuum is not alpha-ready while persona chat depends on TypeScript as the runtime authority. + +The current failure is measurable: + +- PR #1061 live smoke on Mac M-series, branch `fix/persona-chat-inference-priority`, marker `codex-1061-chat-smoke-1778202469`. +- `collaboration/chat/send` stored the message immediately. +- After 195 seconds, only CodeReview AI replied. +- Teacher, Helper, Local Assistant, and Vision AI did not reply. + +That means the issue is larger than background Hippocampus LLM contention. Node-side orchestration is too slow, too opaque, and too easy to regress. The persona system needs the same shape as a high-performance 3D engine: a Rust frame/turn loop, explicit resource budgets, predictable scheduling, and thin adapters at the edge. + +## Product Bar + +Alpha chat must meet these gates on a local machine: + +- First visible local persona response in under 10 seconds for text-only chat. +- All eligible local personas either respond or emit observable silence reasons within 30 seconds. +- No background memory, RAG, embedding, or health job may consume the visible chat inference lane without Rust scheduler admission. +- Model/provider choice must come from a single typed registry and capability query, not string checks scattered through TS. +- Local means Qwen/llama.cpp through Continuum's runtime. Ollama is not a supported concept. +- UI and commands may be TypeScript, but persona runtime authority must be Rust. + +## Engine Model + +Rust owns: + +- Turn admission and batching. +- Persona response scheduling. +- Dependency wakeups between turn artifacts and subscriber work. +- Local inference lane capacity. +- Model and provider selection. +- RAG source fan-out and shared cache keys. +- Memory consolidation admission. +- LoRA, KV, and multimodal resource paging. +- Runtime metrics and slow-command evidence. + +TypeScript owns: + +- Browser UI. +- Command adapters. +- Entity loading until the data module is fully Rust-backed. +- Presentation and operator tooling. + +TypeScript must not own: + +- Which personas run. +- In what order they run. +- How many local generations run at once. +- Which model satisfies a capability request. +- Whether background work may use the inference lane. + +## CBAR Precedent: Turn Frames, Not FIFO Chat + +The old CB mobile SDK solved the same class of problem under harder latency +pressure. Its C++ core owned the frame loop, cache invalidation, analyzer +cadence, and backpressure; Objective-C, Swift, Kotlin, and web wrappers were +bindings. Continuum needs the same split: Rust is the engine, TypeScript is a +thin adapter. + +The direct mapping: + +- `CBAR_VideoFrame` becomes a `CognitionTurnFrame`. +- Lazy image getters become lazy turn artifacts: canonical room snapshot, + conversation history, shared RAG results, capability plan, model selection, + prompt fragments, embedding batches, and memory deltas. +- Analyzer subscribers become persona recipes, memory jobs, RAG jobs, tool + jobs, and airc bridge jobs. +- `QueueThread` priority/cadence becomes Rust resource-class queues with + explicit local inference, embedding, I/O, and background budgets. +- Frame-drop backpressure becomes stale-work cancellation: if a newer chat + turn supersedes a background semantic-memory synthesis job, keep the latest + raw memory and drop or defer the stale synthesis. + +The core rule is dependency wakeup, not global FIFO. Work never waits for +unrelated work. A job declares which artifact keys it needs; when those keys +become ready, subscribers wake. If terrain changes in CBAR, semantic +segmentation, color filtering, ORB, SLAM, and surface accumulation wake +according to their declared cadence. If a chat turn arrives in Continuum, the +shared turn artifacts build once, then eligible personas, memory jobs, and +export/airc observers wake from those artifacts. + +The architecture must preserve these invariants: + +- The hot path never blocks on background work. +- Runtime workers should stay busy with ready work, but worker saturation must + not become a global lock. +- The scheduler starts from maximum safe parallelism: CPUs busy, GPU admitted + deliberately, and independent work running concurrently. It reduces cadence, + precision, or concurrency only when measured pressure or dependency order + requires it. +- Shared artifacts are computed once per turn and cached by stable key. +- Subscribers can run at different cadences and priorities. +- Each subscriber owns its trigger predicate: artifact changed, elapsed time, + resource pressure changed, explicit command, or human/agent event. +- Backpressure prefers latest useful state over draining stale queues. +- Model/GPU work is admitted by Rust before it starts. +- Wrapper layers do not invent scheduling policy. + +## Contract Style: Small Interfaces, Opaque Engines + +CBAR kept the hard machinery behind small C++ classes. `PIMPL` hid memory +layout, cache state, thread ownership, and platform-specific buffers while the +public headers stayed small. Continuum needs the Rust equivalent: + +- Public contracts are small typed structs and traits. +- Runtime state is opaque and owned by Rust. +- Boundaries pass handles, ids, and leases instead of copying memory. Large + payloads such as media frames, embeddings, KV caches, model weights, LoRA + pages, WebRTC buffers, and Bevy textures stay resident in their owning pool. +- Extension points are capability/recipe/model traits, not callback trees full + of scheduling policy. +- Threading and multiprocessing are low-friction because queues, wakeups, + pressure, and metrics are inherited from the engine. +- Adding a new persona recipe, model family, LoRA paging policy, RAG source, or + game observer should mean implementing a narrow trait and declaring + dependencies, not rewriting orchestration. + +The repeated pattern should be: + +1. Declare input artifacts and capabilities. +2. Declare resource class and budget. +3. Pass artifact handles, not copied payloads. +4. Implement the small work trait. +5. Let Rust schedule, coalesce, wake, defer, and measure it. + +That is the SOLID boundary for this project. Wrappers and feature modules ask +for work; the Rust engine decides how to run it. + +This also covers always-on contexts such as a game running in the background. +The game stream is just another artifact producer. New terrain, changed quest +state, visible enemies, or elapsed cadence can wake vision, code, memory, or +planning subscribers without blocking chat. If the GPU budget is tight, Rust +degrades intentionally: skip stale frames, lower cadence, summarize, or emit a +silence/deferred reason. It must not let background perception kill visible +conversation. + +This is the engine-level answer to the current persona flood. The failure is +not just "too many messages"; it is missing turn-frame consolidation. Multiple +personas responding to one room event should share one room snapshot, one RAG +fan-out, one model-capability resolution, and one scheduler decision. They +should not each build a private universe and fight over the same local model. + +## Existing Rust Assets + +Keep and extend these instead of recreating logic in TypeScript: + +- `workers/continuum-core/src/cognition/turn_batch.rs`: deterministic per-turn planning. +- `workers/continuum-core/src/persona/channel_queue.rs`: consolidated domain queues. +- `workers/continuum-core/src/persona/channel_registry.rs`: service-cycle scheduling. +- `workers/continuum-core/src/persona/response.rs`: per-persona response path. +- `workers/continuum-core/src/persona/model_selection.rs`: adapter-aware model selection. +- `workers/continuum-core/src/model_registry/*`: typed model/provider/capability registry. +- `workers/continuum-core/src/inference/backends/llamacpp_scheduler.rs`: llama.cpp scheduling. +- `workers/continuum-core/src/paging/broker.rs`: cross-pool pressure broker. +- `workers/continuum-core/src/runtime/*`: module registry, metrics, IPC, event bus, and concurrency limits. + +## Adaptive Throughput Substrate + +The best complete throughput design in the Cambrian codebase is CBAR: +bounded `QueueThread` workers, lazy frame artifacts, subscriber analyzers, +priority/cadence, newest-state backpressure, and thin platform wrappers. +Continuum has several strong Rust primitives, but they are not yet one unified +substrate: + +- `ServiceModule` and `ModuleConfig`: one runtime extension seam for commands, + event subscriptions, priority, concurrency, and ticks. +- `MessageBus`: typed event fan-out with coalescing and recent-event replay. +- `llamacpp_scheduler`: continuous local generation, sequence attribution, and + future LoRA/KV routing point. +- `FootprintRegistry`: cross-resource accounting by backend, persona, and + resource kind. +- `PagedResourcePool`: generic residency, pinning, LRU-style eviction, stats, + and reload/spill hooks. +- `PressureBroker`: cross-pool pressure decisions. +- `ChannelQueue` / `QueueItemBehavior`: generic containers where domain items + own priority, consolidation, and staleness. + +These should converge into one reusable adaptive-throughput pattern for every +expensive process: + +1. A job declares identity: `turn_key`, `artifact_key`, `persona_id`, + `resource_class`, and optional `recipe/model/provider`. +2. A job declares dependencies by handle, not payload. +3. A scheduler admits the job when dependencies are ready and resources fit. +4. The job runs in the narrowest resource lane that can satisfy it: CPU, data, + GPU, embedding, local generation, cloud provider, I/O, media, render, + memory, or background. +5. The job emits typed artifacts/events and updates footprint/trace metrics. +6. Downstream subscribers wake from artifact readiness, not from global FIFO. + +This becomes the repeated process model for chat, RAG, memory consolidation, +embedding, vision, live video, game observers, LoRA paging, MoE expert routing, +airc bridging, and grid-distributed work. + +The same substrate must cover the historically troublesome paths: + +- ORM/data: canonical entity resolution and query work move through `Data` + lanes and emit handles, not browser-authoritative identity blobs. +- Inference: local Qwen/llama.cpp generation moves through `LocalGeneration` + lanes backed by model residency and KV/LoRA pressure. +- WebRTC/audio/video: packet/frame work moves through `Media` lanes and passes + frame ids, buffer leases, and content hashes. +- Bevy/live rendering: render work moves through `Render` lanes and passes + texture ids or GPU residency handles. + +The substrate must be adaptive before it is clever: + +- Start from maximum safe parallelism. +- Keep CPU workers busy with independent ready work. +- Admit GPU/model work deliberately from memory and lane evidence. +- Prefer latest useful state over draining stale queues. +- Coalesce repeated work by stable identity keys. +- Degrade cadence, precision, context, or subscriber count under pressure. +- Surface deferrals and silence reasons as first-class output. +- Never copy large payloads across process or language boundaries when a handle + can identify resident data. + +The failure to avoid is every module owning its own queue, throttle, retry, +cache, and memory heuristic. The extension author should implement a small +contract and inherit the hard parts: scheduling, pressure, telemetry, artifact +cache negotiation, and wakeups. + +### Pipes Carry Leases, Not Bytes + +Continuum already moves audio, video, WebRTC/UDP traffic, Docker-hosted +services, inference contexts, embeddings, and chat artifacts across module +boundaries. Generic IPC becomes the bottleneck when each boundary copies the +bytes and each module rehydrates its own view of the world. + +The shared pattern must be: + +- Media frames live in a media/frame pool and cross boundaries as frame ids, + texture ids, or buffer leases. +- WebRTC/UDP payloads stay in transport-owned buffers until a subscriber + explicitly needs a decoded artifact. +- Embeddings live in an embedding pool and cross boundaries as vector handles + plus version/content hashes. +- KV cache pages, LoRA pages, mmproj weights, and model weights live in paging + pools and cross boundaries as residency leases. +- Chat/RAG/context artifacts live behind stable turn keys and source hashes, + not copied prompt blobs on every persona call. +- Docker/process boundaries use the same handle protocol when the underlying + memory cannot be shared directly: pass ids, ranges, hashes, offsets, and + leases; copy only at the final unavoidable edge. + +IPC should move control messages and handles. Bulk bytes stay resident in the +nearest owning pool. This is how the system avoids clogging pipes while still +letting independent modules subscribe to the same live world. + +## Failure Modes To Eliminate + +### Single-Responder Collapse + +Symptom: only one persona replies to a broad human message. + +Root causes to prevent: + +- TS-side coordination window or locks silently deciding for all personas. +- Local provider queue monopolized by one persona or background work. +- RAG/source fan-out repeated per persona until the first responder consumes all budget. + +Rust fix: + +- `cognition/plan-turn-batch` returns one `PersonaTurnPlan` per candidate, with generation order, wave, estimated start, and estimated finish. +- The host must execute that plan or surface why it cannot. +- A later Rust `persona/run-turn` command should execute the plan directly and return posted response envelopes. +- The plan is the first `CognitionTurnFrame`: every shared artifact in it must + be reused across persona subscribers unless explicitly declared + persona-local. +- The plan exposes whether the turn can meet the first-response and + all-responses alpha budgets before expensive execution starts. + +### Slow Chat + +Symptom: first reply arrives after 95+ seconds. + +Root causes to prevent: + +- Node event loop is the scheduler. +- Background tasks share local generation without admission. +- Model startup, RAG, and memory work are serialized without a visible plan. + +Rust fix: + +- Planner consumes local capacity from `inference/capacity`. +- Planner emits waves and expected timing. +- Runtime metrics report queue time versus execution time for every module command. + +### ORM And Room Identity Drift + +Symptom: stale General room tabs, wrong UUIDs, old chat rows, localStorage resurrecting ghost rooms. + +Root causes to prevent: + +- Multiple sources of truth for default rooms. +- URL rewrite before canonical room resolution. +- Browser-local state overriding ORM truth. + +Rust fix: + +- Data module becomes the canonical room/activity resolver. +- UI receives canonical handles after resolution. +- Browser caches may remember view state, not entity identity. + +### IPC Drift + +Symptom: TS and Rust believe different things about capacity, model capabilities, or command state. + +Root causes to prevent: + +- Hand-written TS types or duplicate constants. +- Commands returning success while the downstream runtime did nothing. +- Fire-and-forget process boundaries hiding failures. + +Rust fix: + +- ts-rs generated contracts for planner/runtime payloads. +- Command execution throws on failure at the caller boundary. +- Runtime metrics expose command queue time and error count. + +## PR Sequence + +### PR A: Rust Turn Schedule Contract + +Purpose: make scheduling explicit and testable. + +Scope: + +- Extend `RecipeTurnBatchRequest` with `local_inference_capacity`. +- Extend `PersonaTurnPlan` with `generation_wave`, `estimated_start_ms`, and `estimated_finish_ms`. +- Extend `RecipeTurnBatchPlan` with first-response/all-responses budget + evidence. +- Keep planner pure: no ORM, no inference, no filesystem. +- Add unit tests for deterministic waves and capacity. +- Document the CBAR-derived dependency-wakeup model as the alpha runtime + direction. + +Validation: + +- `cargo test -p continuum-core --features metal,accelerate cognition::turn_batch --lib` + +### PR B: TypeScript Adapter Obeys Rust Plan + +Purpose: stop TS from inventing its own fan-out and ordering. + +Scope: + +- The chat path calls `cognition/plan-turn-batch` before building per-persona context. +- RAG shared sources are loaded once per turn. +- Persona execution follows `generation_wave` and local capacity. +- If execution diverges from plan, log a structured runtime error. + +Validation: + +- Browser chat smoke sends one marker. +- Export must show every eligible persona either responded or emitted a silence reason within 30 seconds. +- Runtime metrics must show no unplanned local inference calls. + +### PR C: Rust Persona Run-Turn + +Purpose: move the turn loop into Rust. + +Scope: + +- Add `cognition/run-turn` or `persona/run-turn`. +- Input: trigger, candidates, room snapshot, model/capability declarations. +- Output: response envelopes and silence reasons. +- Rust uses the channel registry and response path directly. +- TypeScript only posts returned envelopes through existing chat storage until the data module is Rust-backed. + +Validation: + +- Rust unit tests for scheduler behavior. +- Integration replay for two, three, and five local personas. +- Slow-command metrics prove queue time and inference time separately. + +### PR D: Rust Model Resolver + +Purpose: one typed source of truth for model capability matching. + +Scope: + +- Add a request shape like `ModelRequirement`. +- Fields include capabilities, architecture family, context window range, memory budget, modality, provider preference, and local/cloud policy. +- Resolver returns a concrete model id, provider id, expected memory footprint, and reason. +- No hard-coded persona model names in TS. + +Validation: + +- Qwen3.5 text model selected for text chat on local. +- Qwen2-VL selected for vision when vision is requested and memory allows. +- Missing model produces an actionable error, not a fallback to a random provider. + +### PR E: Rust Memory/RAG Admission + +Purpose: background cognition cannot starve chat. + +Scope: + +- Memory consolidation is a scheduled background job with a resource class. +- Semantic compression requires explicit admission from the Rust scheduler. +- RAG source cache is keyed by the turn planner and reused across personas. + +Validation: + +- A chat smoke with memory enabled still meets the 10s/30s gates. +- Runtime metrics show background work deferred under chat load. + +### PR F: Rust Data Canonical Handles + +Purpose: eliminate ghost rooms and browser state authority. + +Scope: + +- Canonical room resolution moves behind the Rust data/runtime boundary. +- Browser routing uses resolved handles only. +- LocalStorage cannot create or select an entity id before canonical resolution. + +Validation: + +- Clearing or retaining browser storage yields the same canonical General room. +- No deterministic `stringToUUID("General")` style fallback appears in the UI path. + +## Test Strategy + +Use VDD plus TDD: + +- TDD for pure Rust units: planner, model resolver, queue consolidation, capacity waves. +- VDD for live behavior: browser chat marker, response count, latency, model used, CPU/GPU utilization. +- Replay tests for captured failures. +- Metrics tests for queue time, generation time, silence reasons, and background deferral. + +Every PR must include: + +- A focused Rust test when it touches runtime logic. +- A live chat smoke result when it claims chat improvement. +- A short note explaining whether Node authority increased, decreased, or stayed flat. + +## Immediate Rule + +Do not merge a chat-path PR to canary based only on compile success. + +For chat-path work, the merge gate is: + +- CI green. +- Rust focused tests green. +- Live chat smoke produces useful persona behavior, or the PR is explicitly labeled as instrumentation/guardrail and not claimed as a chat fix. diff --git a/src/shared/generated/cognition/AdaptiveThroughputPlan.ts b/src/shared/generated/cognition/AdaptiveThroughputPlan.ts new file mode 100644 index 000000000..7cbf48241 --- /dev/null +++ b/src/shared/generated/cognition/AdaptiveThroughputPlan.ts @@ -0,0 +1,4 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ThroughputJob } from "./ThroughputJob"; + +export type AdaptiveThroughputPlan = { admitted: Array, deferredMissingDependencies: Array, deferredResourcePressure: Array, droppedStale: Array, droppedSuperseded: Array, }; diff --git a/src/shared/generated/cognition/AdaptiveThroughputRequest.ts b/src/shared/generated/cognition/AdaptiveThroughputRequest.ts new file mode 100644 index 000000000..29e4bce19 --- /dev/null +++ b/src/shared/generated/cognition/AdaptiveThroughputRequest.ts @@ -0,0 +1,5 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ThroughputJob } from "./ThroughputJob"; +import type { ThroughputLaneBudget } from "./ThroughputLaneBudget"; + +export type AdaptiveThroughputRequest = { readyArtifactKeys: Array, laneBudgets: Array, jobs: Array, nowMs: number, }; diff --git a/src/shared/generated/cognition/PersonaTurnPlan.ts b/src/shared/generated/cognition/PersonaTurnPlan.ts index 3b8b1b3b1..9961a977c 100644 --- a/src/shared/generated/cognition/PersonaTurnPlan.ts +++ b/src/shared/generated/cognition/PersonaTurnPlan.ts @@ -3,4 +3,4 @@ /** * Persona-specific work item for the turn. */ -export type PersonaTurnPlan = { personaId: string, displayName: string, specialty: string, model: string, provider: string, localModel: boolean, generationOrder: number, personaContextKey: string, ragCacheKey: string, inputBudgetTokens: number, maxOutputTokens: number, sourceNames: Array, }; +export type PersonaTurnPlan = { personaId: string, displayName: string, specialty: string, model: string, provider: string, localModel: boolean, generationOrder: number, generationWave: number, personaContextKey: string, ragCacheKey: string, inputBudgetTokens: number, maxOutputTokens: number, estimatedStartMs: number, estimatedFinishMs: number, sourceNames: Array, }; diff --git a/src/shared/generated/cognition/RecipeTurnBatchPlan.ts b/src/shared/generated/cognition/RecipeTurnBatchPlan.ts index d6e5dd1f8..563f7e1d2 100644 --- a/src/shared/generated/cognition/RecipeTurnBatchPlan.ts +++ b/src/shared/generated/cognition/RecipeTurnBatchPlan.ts @@ -5,4 +5,4 @@ import type { SharedRagSourcePlan } from "./SharedRagSourcePlan"; /** * Result of `cognition/plan-turn-batch`. */ -export type RecipeTurnBatchPlan = { turnKey: string, roomId: string, messageId?: string, queryText: string, sharedSources: Array, personaPlans: Array, skippedDuplicatePersonaIds: Array, maxConcurrentLocalGenerations: number, }; +export type RecipeTurnBatchPlan = { turnKey: string, roomId: string, messageId?: string, queryText: string, sharedSources: Array, personaPlans: Array, skippedDuplicatePersonaIds: Array, maxConcurrentLocalGenerations: number, estimatedFirstResponseMs: number, estimatedAllResponsesMs: number, meetsFirstResponseBudget: boolean, meetsAllResponsesBudget: boolean, }; diff --git a/src/shared/generated/cognition/RecipeTurnBatchRequest.ts b/src/shared/generated/cognition/RecipeTurnBatchRequest.ts index 1b336391f..0239af34e 100644 --- a/src/shared/generated/cognition/RecipeTurnBatchRequest.ts +++ b/src/shared/generated/cognition/RecipeTurnBatchRequest.ts @@ -11,4 +11,21 @@ export type RecipeTurnBatchRequest = { trigger: RecipeTurnTrigger, personas: Arr * Total input-token budget for shared RAG planning. Per-persona * generation still uses each candidate's model limits. */ -totalInputBudgetTokens: number, }; +totalInputBudgetTokens: number, +/** + * Local inference lanes available for this turn. Zero means unknown, + * treated as one lane. The host should pass `inference/capacity` here + * so the planner, admission control, and runtime scheduler share the + * same source of truth. + */ +localInferenceCapacity: number, +/** + * Visible-response budget for the first local persona reply. Zero means + * use the alpha gate default. + */ +firstResponseBudgetMs: number, +/** + * Visible-response budget for every admitted persona to either respond + * or emit a silence reason. Zero means use the alpha gate default. + */ +allResponsesBudgetMs: number, }; diff --git a/src/shared/generated/cognition/ResourceClass.ts b/src/shared/generated/cognition/ResourceClass.ts new file mode 100644 index 000000000..601fa45f1 --- /dev/null +++ b/src/shared/generated/cognition/ResourceClass.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ResourceClass = "CPU" | "DATA" | "GPU" | "EMBEDDING" | "LOCAL_GENERATION" | "CLOUD_PROVIDER" | "IO" | "MEDIA" | "RENDER" | "MEMORY" | "BACKGROUND"; diff --git a/src/shared/generated/cognition/TargetSilicon.ts b/src/shared/generated/cognition/TargetSilicon.ts new file mode 100644 index 000000000..fa0ca373d --- /dev/null +++ b/src/shared/generated/cognition/TargetSilicon.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type TargetSilicon = "CPU" | "GPU" | "UNIFIED_MEMORY" | "NETWORK" | "DISK" | "CLOUD" | "BACKGROUND"; diff --git a/src/shared/generated/cognition/ThroughputJob.ts b/src/shared/generated/cognition/ThroughputJob.ts new file mode 100644 index 000000000..c8b1e6af5 --- /dev/null +++ b/src/shared/generated/cognition/ThroughputJob.ts @@ -0,0 +1,9 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ResourceClass } from "./ResourceClass"; +import type { TargetSilicon } from "./TargetSilicon"; + +export type ThroughputJob = { jobId: string, artifactKey: string, resourceClass: ResourceClass, targetSilicon: TargetSilicon, priority: number, costUnits: number, dependencyKeys: Array, createdAtMs: number, +/** + * Zero means never stale. + */ +staleAfterMs: number, }; diff --git a/src/shared/generated/cognition/ThroughputLaneBudget.ts b/src/shared/generated/cognition/ThroughputLaneBudget.ts new file mode 100644 index 000000000..46e35a2fd --- /dev/null +++ b/src/shared/generated/cognition/ThroughputLaneBudget.ts @@ -0,0 +1,10 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ResourceClass } from "./ResourceClass"; +import type { TargetSilicon } from "./TargetSilicon"; + +export type ThroughputLaneBudget = { +/** + * Semantic owner for observability. Admission is keyed by target_silicon + * so LocalGeneration, Media, and Render can share one physical GPU budget. + */ +resourceClass: ResourceClass, targetSilicon: TargetSilicon, maxConcurrency: number, maxCostUnits: number, }; diff --git a/src/workers/continuum-core/src/cognition/adaptive_throughput.rs b/src/workers/continuum-core/src/cognition/adaptive_throughput.rs new file mode 100644 index 000000000..2db2048fc --- /dev/null +++ b/src/workers/continuum-core/src/cognition/adaptive_throughput.rs @@ -0,0 +1,593 @@ +//! Adaptive throughput planning primitives. +//! +//! This is the small, pure contract behind the "Adaptive Throughput +//! Substrate" architecture. It does not execute jobs, touch IPC, load +//! models, or inspect ORM state. It answers one question: +//! +//! Given ready artifacts, resource lane budgets, and a batch of proposed +//! jobs, which jobs should run now, which should defer, and which stale +//! duplicates should be dropped? +//! +//! Every expensive subsystem should eventually map into this shape: chat, +//! RAG, memory, embeddings, vision, live video, game observers, local +//! generation, LoRA paging, MoE expert routing, airc bridging, and +//! grid-distributed work. +//! +//! This is a planner, not a scheduler. Callers re-plan when MessageBus (or +//! another wake source) reports that artifact keys became ready. The lease +//! layer will later connect these admitted jobs to FootprintRegistry and +//! PressureBroker ownership; this module intentionally stays pure. + +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, BTreeSet}; +use ts_rs::TS; + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize, TS)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +#[ts( + export, + export_to = "../../../shared/generated/cognition/ResourceClass.ts" +)] +pub enum ResourceClass { + Cpu, + Data, + Gpu, + Embedding, + LocalGeneration, + CloudProvider, + Io, + Media, + Render, + Memory, + Background, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize, TS)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +#[ts( + export, + export_to = "../../../shared/generated/cognition/TargetSilicon.ts" +)] +pub enum TargetSilicon { + Cpu, + Gpu, + UnifiedMemory, + Network, + Disk, + Cloud, + Background, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts( + export, + export_to = "../../../shared/generated/cognition/ThroughputLaneBudget.ts" +)] +pub struct ThroughputLaneBudget { + /// Semantic owner for observability. Admission is keyed by target_silicon + /// so LocalGeneration, Media, and Render can share one physical GPU budget. + pub resource_class: ResourceClass, + pub target_silicon: TargetSilicon, + pub max_concurrency: usize, + pub max_cost_units: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts( + export, + export_to = "../../../shared/generated/cognition/ThroughputJob.ts" +)] +pub struct ThroughputJob { + pub job_id: String, + pub artifact_key: String, + pub resource_class: ResourceClass, + pub target_silicon: TargetSilicon, + pub priority: u32, + pub cost_units: u32, + #[serde(default)] + pub dependency_keys: Vec, + #[serde(default)] + #[ts(type = "number")] + pub created_at_ms: u64, + /// Zero means never stale. + #[serde(default)] + #[ts(type = "number")] + pub stale_after_ms: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts( + export, + export_to = "../../../shared/generated/cognition/AdaptiveThroughputRequest.ts" +)] +pub struct AdaptiveThroughputRequest { + #[serde(default)] + pub ready_artifact_keys: Vec, + pub lane_budgets: Vec, + pub jobs: Vec, + #[serde(default)] + #[ts(type = "number")] + pub now_ms: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts( + export, + export_to = "../../../shared/generated/cognition/AdaptiveThroughputPlan.ts" +)] +pub struct AdaptiveThroughputPlan { + pub admitted: Vec, + pub deferred_missing_dependencies: Vec, + pub deferred_resource_pressure: Vec, + pub dropped_stale: Vec, + pub dropped_superseded: Vec, +} + +pub fn plan_adaptive_throughput(req: AdaptiveThroughputRequest) -> AdaptiveThroughputPlan { + let ready_artifacts: BTreeSet = req.ready_artifact_keys.into_iter().collect(); + let lane_budgets = normalize_lane_budgets(req.lane_budgets); + let mut usable_jobs = Vec::new(); + let mut dropped_stale = Vec::new(); + + for job in req.jobs { + if is_stale(&job, req.now_ms) { + dropped_stale.push(job); + } else { + usable_jobs.push(job); + } + } + + let (coalesced_jobs, dropped_superseded) = coalesce_by_identity(usable_jobs); + + let mut dependency_ready = Vec::new(); + let mut deferred_missing_dependencies = Vec::new(); + for job in coalesced_jobs { + if dependencies_ready(&job, &ready_artifacts) { + dependency_ready.push(job); + } else { + deferred_missing_dependencies.push(job); + } + } + + dependency_ready.sort_by(compare_jobs); + + let mut used_by_lane: BTreeMap = BTreeMap::new(); + let mut admitted = Vec::new(); + let mut deferred_resource_pressure = Vec::new(); + + for job in dependency_ready { + if can_admit(&job, &lane_budgets, &used_by_lane) { + let used = used_by_lane.entry(job.target_silicon).or_insert((0, 0)); + used.0 += 1; + used.1 = used.1.saturating_add(job.cost_units); + admitted.push(job); + } else { + deferred_resource_pressure.push(job); + } + } + + AdaptiveThroughputPlan { + admitted, + deferred_missing_dependencies, + deferred_resource_pressure, + dropped_stale, + dropped_superseded, + } +} + +fn normalize_lane_budgets( + budgets: Vec, +) -> BTreeMap { + budgets + .into_iter() + .map(|budget| (budget.target_silicon, budget)) + .collect() +} + +fn is_stale(job: &ThroughputJob, now_ms: u64) -> bool { + job.stale_after_ms > 0 && now_ms.saturating_sub(job.created_at_ms) > job.stale_after_ms +} + +fn coalesce_by_identity(jobs: Vec) -> (Vec, Vec) { + let mut winners: BTreeMap<(ResourceClass, String), ThroughputJob> = BTreeMap::new(); + let mut dropped = Vec::new(); + + for job in jobs { + let key = (job.resource_class, job.artifact_key.clone()); + if let Some(existing) = winners.get(&key) { + if compare_jobs(&job, existing).is_lt() { + dropped.push(existing.clone()); + winners.insert(key, job); + } else { + dropped.push(job); + } + } else { + winners.insert(key, job); + } + } + + (winners.into_values().collect(), dropped) +} + +fn dependencies_ready(job: &ThroughputJob, ready_artifacts: &BTreeSet) -> bool { + job.dependency_keys + .iter() + .all(|key| ready_artifacts.contains(key)) +} + +fn can_admit( + job: &ThroughputJob, + budgets: &BTreeMap, + used_by_lane: &BTreeMap, +) -> bool { + let Some(budget) = budgets.get(&job.target_silicon) else { + return false; + }; + let used = used_by_lane + .get(&job.target_silicon) + .copied() + .unwrap_or((0, 0)); + used.0 < budget.max_concurrency + && used.1.saturating_add(job.cost_units) <= budget.max_cost_units +} + +fn compare_jobs(left: &ThroughputJob, right: &ThroughputJob) -> std::cmp::Ordering { + right + .priority + .cmp(&left.priority) + .then_with(|| right.created_at_ms.cmp(&left.created_at_ms)) + .then_with(|| left.job_id.cmp(&right.job_id)) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn budget( + resource_class: ResourceClass, + target_silicon: TargetSilicon, + max_concurrency: usize, + ) -> ThroughputLaneBudget { + ThroughputLaneBudget { + resource_class, + target_silicon, + max_concurrency, + max_cost_units: 1_000, + } + } + + fn job( + id: &str, + artifact: &str, + resource_class: ResourceClass, + target_silicon: TargetSilicon, + priority: u32, + ) -> ThroughputJob { + ThroughputJob { + job_id: id.to_string(), + artifact_key: artifact.to_string(), + resource_class, + target_silicon, + priority, + cost_units: 1, + dependency_keys: Vec::new(), + created_at_ms: 100, + stale_after_ms: 0, + } + } + + #[test] + fn independent_ready_work_is_not_blocked_by_missing_dependencies() { + let mut blocked = job( + "blocked", + "blocked-output", + ResourceClass::LocalGeneration, + TargetSilicon::Gpu, + 100, + ); + blocked.dependency_keys = vec!["missing-rag".to_string()]; + + let plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: vec!["room-snapshot".to_string()], + lane_budgets: vec![ + budget(ResourceClass::LocalGeneration, TargetSilicon::Gpu, 1), + budget(ResourceClass::Cpu, TargetSilicon::Cpu, 4), + ], + jobs: vec![ + blocked, + job( + "cpu-ready", + "analysis", + ResourceClass::Cpu, + TargetSilicon::Cpu, + 50, + ), + job( + "local-ready", + "reply", + ResourceClass::LocalGeneration, + TargetSilicon::Gpu, + 40, + ), + ], + now_ms: 150, + }); + + let admitted: Vec<&str> = plan + .admitted + .iter() + .map(|job| job.job_id.as_str()) + .collect(); + assert_eq!(admitted, vec!["cpu-ready", "local-ready"]); + assert_eq!(plan.deferred_missing_dependencies.len(), 1); + assert_eq!(plan.deferred_missing_dependencies[0].job_id, "blocked"); + } + + #[test] + fn same_artifact_jobs_coalesce_to_latest_highest_priority_work() { + let old = job( + "old", + "turn-rag", + ResourceClass::Cpu, + TargetSilicon::Cpu, + 10, + ); + let mut new = job( + "new", + "turn-rag", + ResourceClass::Cpu, + TargetSilicon::Cpu, + 10, + ); + new.created_at_ms = 200; + + let plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: Vec::new(), + lane_budgets: vec![budget(ResourceClass::Cpu, TargetSilicon::Cpu, 4)], + jobs: vec![old, new], + now_ms: 250, + }); + + assert_eq!(plan.admitted.len(), 1); + assert_eq!(plan.admitted[0].job_id, "new"); + assert_eq!(plan.dropped_superseded.len(), 1); + assert_eq!(plan.dropped_superseded[0].job_id, "old"); + } + + #[test] + fn resource_lane_budget_defers_excess_without_blocking_other_lanes() { + let plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: Vec::new(), + lane_budgets: vec![ + budget(ResourceClass::LocalGeneration, TargetSilicon::Gpu, 1), + budget(ResourceClass::Embedding, TargetSilicon::Cpu, 2), + ], + jobs: vec![ + job( + "local-a", + "reply-a", + ResourceClass::LocalGeneration, + TargetSilicon::Gpu, + 100, + ), + job( + "local-b", + "reply-b", + ResourceClass::LocalGeneration, + TargetSilicon::Gpu, + 90, + ), + job( + "embed-a", + "embedding-a", + ResourceClass::Embedding, + TargetSilicon::Cpu, + 10, + ), + job( + "embed-b", + "embedding-b", + ResourceClass::Embedding, + TargetSilicon::Cpu, + 9, + ), + ], + now_ms: 150, + }); + + let admitted: Vec<&str> = plan + .admitted + .iter() + .map(|job| job.job_id.as_str()) + .collect(); + assert_eq!(admitted, vec!["local-a", "embed-a", "embed-b"]); + assert_eq!(plan.deferred_resource_pressure.len(), 1); + assert_eq!(plan.deferred_resource_pressure[0].job_id, "local-b"); + } + + #[test] + fn stale_work_is_dropped_before_it_consumes_lane_budget() { + let mut stale = job( + "stale", + "old-frame", + ResourceClass::Gpu, + TargetSilicon::Gpu, + 100, + ); + stale.created_at_ms = 0; + stale.stale_after_ms = 50; + + let plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: Vec::new(), + lane_budgets: vec![budget(ResourceClass::Gpu, TargetSilicon::Gpu, 1)], + jobs: vec![ + stale, + job( + "fresh", + "new-frame", + ResourceClass::Gpu, + TargetSilicon::Gpu, + 10, + ), + ], + now_ms: 100, + }); + + assert_eq!(plan.admitted.len(), 1); + assert_eq!(plan.admitted[0].job_id, "fresh"); + assert_eq!(plan.dropped_stale.len(), 1); + assert_eq!(plan.dropped_stale[0].job_id, "stale"); + } + + #[test] + fn orm_inference_webrtc_and_bevy_paths_share_the_same_substrate() { + let mut inference = job( + "infer", + "turn:1:reply", + ResourceClass::LocalGeneration, + TargetSilicon::Gpu, + 90, + ); + inference.dependency_keys = vec!["room:general:canonical".to_string()]; + + let mut media = job( + "webrtc", + "frame:42:decoded", + ResourceClass::Media, + TargetSilicon::Gpu, + 80, + ); + media.dependency_keys = vec!["packet:42".to_string()]; + + let mut render = job( + "bevy", + "texture:42", + ResourceClass::Render, + TargetSilicon::Gpu, + 70, + ); + render.dependency_keys = vec!["frame:42:decoded".to_string()]; + + let plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: vec![ + "room:general:canonical".to_string(), + "packet:42".to_string(), + ], + lane_budgets: vec![ + budget(ResourceClass::Data, TargetSilicon::Cpu, 4), + budget(ResourceClass::LocalGeneration, TargetSilicon::Gpu, 2), + ], + jobs: vec![ + job( + "orm", + "room:general:canonical", + ResourceClass::Data, + TargetSilicon::Cpu, + 100, + ), + inference, + media, + render, + ], + now_ms: 150, + }); + + let admitted: Vec<&str> = plan + .admitted + .iter() + .map(|job| job.job_id.as_str()) + .collect(); + assert_eq!(admitted, vec!["orm", "infer", "webrtc"]); + assert_eq!(plan.deferred_missing_dependencies.len(), 1); + assert_eq!(plan.deferred_missing_dependencies[0].job_id, "bevy"); + } + + #[test] + fn replanning_moves_dependency_ready_work_into_admitted() { + let mut render = job( + "bevy", + "texture:42", + ResourceClass::Render, + TargetSilicon::Gpu, + 70, + ); + render.dependency_keys = vec!["frame:42:decoded".to_string()]; + + let first_plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: Vec::new(), + lane_budgets: vec![budget(ResourceClass::Render, TargetSilicon::Gpu, 1)], + jobs: vec![render.clone()], + now_ms: 150, + }); + + assert_eq!(first_plan.admitted.len(), 0); + assert_eq!(first_plan.deferred_missing_dependencies.len(), 1); + + let second_plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: vec!["frame:42:decoded".to_string()], + lane_budgets: vec![budget(ResourceClass::Render, TargetSilicon::Gpu, 1)], + jobs: vec![render], + now_ms: 151, + }); + + assert_eq!(second_plan.deferred_missing_dependencies.len(), 0); + assert_eq!(second_plan.admitted.len(), 1); + assert_eq!(second_plan.admitted[0].job_id, "bevy"); + } + + #[test] + fn gpu_bound_work_shares_one_physical_budget_across_semantic_classes() { + let plan = plan_adaptive_throughput(AdaptiveThroughputRequest { + ready_artifact_keys: Vec::new(), + lane_budgets: vec![budget(ResourceClass::Gpu, TargetSilicon::Gpu, 2)], + jobs: vec![ + job( + "local-a", + "reply-a", + ResourceClass::LocalGeneration, + TargetSilicon::Gpu, + 100, + ), + job( + "local-b", + "reply-b", + ResourceClass::LocalGeneration, + TargetSilicon::Gpu, + 99, + ), + job( + "media", + "frame:42", + ResourceClass::Media, + TargetSilicon::Gpu, + 98, + ), + job( + "render", + "texture:42", + ResourceClass::Render, + TargetSilicon::Gpu, + 97, + ), + ], + now_ms: 150, + }); + + let admitted: Vec<&str> = plan + .admitted + .iter() + .map(|job| job.job_id.as_str()) + .collect(); + let deferred: Vec<&str> = plan + .deferred_resource_pressure + .iter() + .map(|job| job.job_id.as_str()) + .collect(); + assert_eq!(admitted, vec!["local-a", "local-b"]); + assert_eq!(deferred, vec!["media", "render"]); + } +} diff --git a/src/workers/continuum-core/src/cognition/mod.rs b/src/workers/continuum-core/src/cognition/mod.rs index 90d42fee9..5a3339e74 100644 --- a/src/workers/continuum-core/src/cognition/mod.rs +++ b/src/workers/continuum-core/src/cognition/mod.rs @@ -29,11 +29,13 @@ pub mod response_orchestrator; pub mod response_validator; +pub mod adaptive_throughput; pub mod shared_analysis; pub mod tool_executor; pub mod turn_batch; pub mod types; +pub use adaptive_throughput::*; pub use response_orchestrator::{ orchestrate, score_persona, PersonaSlot, DEFAULT_RELEVANCE_THRESHOLD, }; diff --git a/src/workers/continuum-core/src/cognition/turn_batch.rs b/src/workers/continuum-core/src/cognition/turn_batch.rs index 999fd7b5a..e128378b9 100644 --- a/src/workers/continuum-core/src/cognition/turn_batch.rs +++ b/src/workers/continuum-core/src/cognition/turn_batch.rs @@ -98,6 +98,32 @@ pub struct RecipeTurnBatchRequest { /// generation still uses each candidate's model limits. #[serde(default)] pub total_input_budget_tokens: usize, + /// Local inference lanes available for this turn. Zero means unknown, + /// treated as one lane. The host should pass `inference/capacity` here + /// so the planner, admission control, and runtime scheduler share the + /// same source of truth. + #[serde(default)] + pub local_inference_capacity: usize, + /// Visible-response budget for the first local persona reply. Zero means + /// use the alpha gate default. + #[serde(default = "default_first_response_budget_ms")] + #[ts(type = "number")] + pub first_response_budget_ms: u64, + /// Visible-response budget for every admitted persona to either respond + /// or emit a silence reason. Zero means use the alpha gate default. + #[serde(default = "default_all_responses_budget_ms")] + #[ts(type = "number")] + pub all_responses_budget_ms: u64, +} + +fn default_first_response_budget_ms() -> u64 { + // Alpha SLO: visible local chat must produce its first response inside 10s. + 10_000 +} + +fn default_all_responses_budget_ms() -> u64 { + // Alpha SLO: all eligible personas must respond or emit silence inside 30s. + 30_000 } /// One shared RAG source load in the plan. @@ -129,10 +155,15 @@ pub struct PersonaTurnPlan { pub provider: String, pub local_model: bool, pub generation_order: usize, + pub generation_wave: usize, pub persona_context_key: String, pub rag_cache_key: String, pub input_budget_tokens: usize, pub max_output_tokens: usize, + #[ts(type = "number")] + pub estimated_start_ms: u64, + #[ts(type = "number")] + pub estimated_finish_ms: u64, pub source_names: Vec, } @@ -154,9 +185,16 @@ pub struct RecipeTurnBatchPlan { pub persona_plans: Vec, pub skipped_duplicate_persona_ids: Vec, pub max_concurrent_local_generations: usize, + #[ts(type = "number")] + pub estimated_first_response_ms: u64, + #[ts(type = "number")] + pub estimated_all_responses_ms: u64, + pub meets_first_response_budget: bool, + pub meets_all_responses_budget: bool, } pub fn plan_turn_batch(req: RecipeTurnBatchRequest) -> RecipeTurnBatchPlan { + let max_concurrent_local_generations = local_generation_capacity(&req); let turn_key = stable_key(&[ "turn", &req.trigger.room_id.to_string(), @@ -180,6 +218,7 @@ pub fn plan_turn_batch(req: RecipeTurnBatchRequest) -> RecipeTurnBatchPlan { let mut seen_personas = HashSet::new(); let mut skipped_duplicate_persona_ids = Vec::new(); let mut persona_plans = Vec::new(); + let mut local_generation_count = 0usize; for candidate in req.personas { if !seen_personas.insert(candidate.persona_id) { @@ -188,6 +227,20 @@ pub fn plan_turn_batch(req: RecipeTurnBatchRequest) -> RecipeTurnBatchPlan { } let generation_order = persona_plans.len(); + let local_model = is_local_provider(&candidate.provider, &candidate.model); + let generation_wave = if local_model { + let wave = local_generation_count / max_concurrent_local_generations; + local_generation_count += 1; + wave + } else { + 0 + }; + let estimated_start_ms = if local_model { + estimate_wave_start_ms(&persona_plans, generation_wave) + } else { + 0 + }; + let estimated_duration_ms = estimate_generation_ms(&candidate); let input_budget_tokens = candidate .context_window .saturating_sub(candidate.max_output_tokens) @@ -212,16 +265,37 @@ pub fn plan_turn_batch(req: RecipeTurnBatchRequest) -> RecipeTurnBatchPlan { specialty: candidate.specialty, model: candidate.model.clone(), provider: candidate.provider.clone(), - local_model: is_local_provider(&candidate.provider, &candidate.model), + local_model, generation_order, + generation_wave, persona_context_key, rag_cache_key, input_budget_tokens, max_output_tokens: candidate.max_output_tokens, + estimated_start_ms, + estimated_finish_ms: estimated_start_ms.saturating_add(estimated_duration_ms), source_names: shared_source_names.clone(), }); } + let estimated_first_response_ms = persona_plans + .iter() + .filter(|plan| plan.local_model) + .map(|plan| plan.estimated_finish_ms) + .min() + .unwrap_or(0); + let estimated_all_responses_ms = persona_plans + .iter() + .filter(|plan| plan.local_model) + .map(|plan| plan.estimated_finish_ms) + .max() + .unwrap_or(0); + + let first_response_budget_ms = + effective_budget_ms(req.first_response_budget_ms, default_first_response_budget_ms()); + let all_responses_budget_ms = + effective_budget_ms(req.all_responses_budget_ms, default_all_responses_budget_ms()); + RecipeTurnBatchPlan { turn_key, room_id: req.trigger.room_id, @@ -230,8 +304,49 @@ pub fn plan_turn_batch(req: RecipeTurnBatchRequest) -> RecipeTurnBatchPlan { shared_sources, persona_plans, skipped_duplicate_persona_ids, - max_concurrent_local_generations: 1, + max_concurrent_local_generations, + estimated_first_response_ms, + estimated_all_responses_ms, + meets_first_response_budget: estimated_first_response_ms <= first_response_budget_ms, + meets_all_responses_budget: estimated_all_responses_ms <= all_responses_budget_ms, + } +} + +fn effective_budget_ms(requested: u64, default_budget: u64) -> u64 { + if requested == 0 { + default_budget + } else { + requested + } +} + +fn local_generation_capacity(req: &RecipeTurnBatchRequest) -> usize { + let requested = req.local_inference_capacity.max(1); + let local_persona_count = req + .personas + .iter() + .filter(|candidate| is_local_provider(&candidate.provider, &candidate.model)) + .count() + .max(1); + requested.min(local_persona_count) +} + +fn estimate_wave_start_ms(existing_plans: &[PersonaTurnPlan], generation_wave: usize) -> u64 { + if generation_wave == 0 { + return 0; } + + existing_plans + .iter() + .filter(|plan| plan.local_model && plan.generation_wave == generation_wave - 1) + .map(|plan| plan.estimated_finish_ms) + .max() + .unwrap_or(0) +} + +fn estimate_generation_ms(candidate: &RecipePersonaCandidate) -> u64 { + let tokens_per_second = candidate.tokens_per_second.unwrap_or(1.0).max(1.0); + (((candidate.max_output_tokens as f32) / tokens_per_second) * 1000.0).ceil() as u64 } fn normalize_sources(sources: Vec) -> Vec { @@ -364,6 +479,9 @@ mod tests { }, ], total_input_budget_tokens: 12_000, + local_inference_capacity: 1, + first_response_budget_ms: default_first_response_budget_ms(), + all_responses_budget_ms: default_all_responses_budget_ms(), } } @@ -431,5 +549,94 @@ mod tests { assert!(plan.persona_plans.iter().all(|p| p.local_model)); assert_eq!(plan.persona_plans[0].generation_order, 0); assert_eq!(plan.persona_plans[1].generation_order, 1); + assert_eq!(plan.persona_plans[0].generation_wave, 0); + assert_eq!(plan.persona_plans[1].generation_wave, 1); + assert_eq!( + plan.persona_plans[1].estimated_start_ms, + plan.persona_plans[0].estimated_finish_ms + ); + assert_eq!( + plan.estimated_first_response_ms, + plan.persona_plans[0].estimated_finish_ms + ); + assert_eq!( + plan.estimated_all_responses_ms, + plan.persona_plans[1].estimated_finish_ms + ); + } + + #[test] + fn local_generation_uses_declared_capacity_for_parallel_waves() { + let mut req = request(); + req.local_inference_capacity = 2; + + let plan = plan_turn_batch(req); + + assert_eq!(plan.max_concurrent_local_generations, 2); + assert_eq!(plan.persona_plans[0].generation_wave, 0); + assert_eq!(plan.persona_plans[1].generation_wave, 0); + assert_eq!(plan.persona_plans[0].estimated_start_ms, 0); + assert_eq!(plan.persona_plans[1].estimated_start_ms, 0); + } + + #[test] + fn exposes_budget_failure_before_execution() { + let mut req = request(); + req.local_inference_capacity = 1; + req.first_response_budget_ms = 1; + req.all_responses_budget_ms = 1; + + let plan = plan_turn_batch(req); + + assert!(!plan.meets_first_response_budget); + assert!(!plan.meets_all_responses_budget); + } + + #[test] + fn zero_budget_uses_alpha_defaults() { + let mut req = request(); + req.personas[0].max_output_tokens = 16; + req.personas[1].max_output_tokens = 16; + req.first_response_budget_ms = 0; + req.all_responses_budget_ms = 0; + + let plan = plan_turn_batch(req); + + assert!(plan.meets_first_response_budget); + assert!(plan.meets_all_responses_budget); + } + + #[test] + fn local_models_are_waved_while_cloud_models_are_not() { + let mut req = request(); + req.local_inference_capacity = 1; + req.personas = vec![ + candidate( + "11111111-1111-4111-8111-111111111111", + "Local One", + "local", + ), + candidate( + "22222222-2222-4222-8222-222222222222", + "Cloud One", + "anthropic", + ), + candidate( + "33333333-3333-4333-8333-333333333333", + "Local Two", + "local", + ), + ]; + req.personas[1].model = "claude-opus-4.1".to_string(); + + let plan = plan_turn_batch(req); + + assert_eq!(plan.max_concurrent_local_generations, 1); + assert!(plan.persona_plans[0].local_model); + assert!(!plan.persona_plans[1].local_model); + assert!(plan.persona_plans[2].local_model); + assert_eq!(plan.persona_plans[0].generation_wave, 0); + assert_eq!(plan.persona_plans[1].generation_wave, 0); + assert_eq!(plan.persona_plans[2].generation_wave, 1); } }