Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/workers/continuum-core/src/genome/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ pub use recall_scoring::{
DEFAULT_RECENCY_HALF_LIFE_MS,
};
pub mod recall_impl;
pub use recall_impl::{CandidateArtifact, LocalDemandAlignedRecall};
pub use recall_impl::{CandidateArtifact, CandidateSource, LocalDemandAlignedRecall};
296 changes: 283 additions & 13 deletions src/workers/continuum-core/src/genome/recall_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@
//! - Embedding model integration (the semantic factor input) —
//! separate Lane H slice.

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use ts_rs::TS;

use super::recall::{RecallScore, ResidencyHint};
use super::recall::{RecallError, RecallScore, ResidencyHint};
use super::recall_scoring::{score, DEFAULT_RECENCY_HALF_LIFE_MS};
use super::recall_trait::{
CompositionHint, EngramRef, LoRALayerRef, MoEExpertRef, RankedPool, RecallScoreWeights,
CapabilityQuery, CompositionHint, DemandAlignedRecall, EngramRef, LoRALayerRef, MoEExpertRef,
RankedPool, RecallContext, RecallScoreWeights,
RecallTrace,
};
use super::working_set::{ArtifactId, PageKind};
Expand Down Expand Up @@ -94,38 +97,102 @@ pub struct CandidateArtifact {
pub provenance_trust_factor: f32,
}

/// Source of recall candidates. PR-3c introduces the seam between
/// the ranking engine (LocalDemandAlignedRecall) and the substrate
/// sources (working-set-manager, genome catalog, federation peers).
/// PR-3d wraps `LocalWorkingSetManager` as a CandidateSource impl.
///
/// `Send + Sync + async_trait` for tokio concurrency. The trait
/// takes the query + context so future impls can do query-aware
/// pruning (don't return artifacts that violate scope, exceed
/// budget, fail freshness target).
///
/// PR-3c's stub impls in tests return canned Vec<CandidateArtifact>;
/// PR-3d's working-set walker returns the persona's resident pages
/// translated to candidates.
#[async_trait]
pub trait CandidateSource: Send + Sync {
/// Return all candidates relevant to the query within the
/// persona's context. Pure data — no scoring, no sorting; the
/// ranking engine handles that.
///
/// May return an empty Vec; recall handles that gracefully
/// (no error, empty pools — caller may try federation).
async fn fetch(
&self,
query: &CapabilityQuery,
context: &RecallContext,
) -> Vec<CandidateArtifact>;
}

/// Per-process implementation of demand-aligned recall ranking.
/// Holds the governor-tunable scoring weights + recency half-life;
/// the actual candidate sourcing is the caller's concern in PR-3b.
/// Holds the governor-tunable scoring weights + recency half-life
/// + an optional CandidateSource for the trait impl.
///
/// Thread-safe through immutability: the struct's fields don't
/// change after construction. `rank` is pure-function over the
/// candidate set + the engine's config. A future PR may add a
/// `with_weights` constructor for governor-driven weight updates;
/// PR-3b's design keeps weights immutable per instance.
/// candidate set + the engine's config. The DemandAlignedRecall
/// trait impl uses the configured CandidateSource to fetch
/// candidates; if no source is configured, recall returns an empty
/// pool (no error — that's a legitimate "no candidates known"
/// signal callers may use to fall back to federation).
pub struct LocalDemandAlignedRecall {
weights: RecallScoreWeights,
half_life_ms: u64,
source: Option<Arc<dyn CandidateSource>>,
}

impl LocalDemandAlignedRecall {
/// Construct with default weights (sum-to-1 baseline from
/// GENOME-FOUNDRY-SENTINEL Part 7) and default 24h recency
/// half-life.
/// Construct with default weights, default 24h recency
/// half-life, and no candidate source. The `rank()` method
/// works (caller passes candidates explicitly) but the trait
/// impl returns empty pools.
pub fn new() -> Self {
Self {
weights: RecallScoreWeights::default(),
half_life_ms: DEFAULT_RECENCY_HALF_LIFE_MS,
source: None,
}
}

/// Construct with explicit weights + half-life. Used by tests
/// and by PR-3c when wiring with governor-driven config.
/// Construct with explicit weights + half-life, no source.
/// Weights are validated by `RecallScoreWeights::new` at
/// construction upstream; this constructor takes them as
/// already-valid.
pub fn with_config(weights: RecallScoreWeights, half_life_ms: u64) -> Self {
Self { weights, half_life_ms }
Self {
weights,
half_life_ms,
source: None,
}
}

/// Construct with a candidate source. The trait impl's
/// `recall()` calls `source.fetch()` then `rank()`. Weights +
/// half-life are at defaults; use `with_config_and_source`
/// for explicit values.
pub fn with_source(source: Arc<dyn CandidateSource>) -> Self {
Self {
weights: RecallScoreWeights::default(),
half_life_ms: DEFAULT_RECENCY_HALF_LIFE_MS,
source: Some(source),
}
}

/// Construct with explicit weights, half-life, AND a candidate
/// source. PR-3d's working-set walker uses this when wiring
/// LocalDemandAlignedRecall into Runtime with governor-driven
/// config.
pub fn with_config_and_source(
weights: RecallScoreWeights,
half_life_ms: u64,
source: Arc<dyn CandidateSource>,
) -> Self {
Self {
weights,
half_life_ms,
source: Some(source),
}
}

/// Score + partition + sort the candidate set. Returns a fully-
Expand Down Expand Up @@ -238,6 +305,58 @@ impl Default for LocalDemandAlignedRecall {
}
}

#[async_trait]
impl DemandAlignedRecall for LocalDemandAlignedRecall {
/// Fetch candidates from the configured CandidateSource, then
/// rank them. If no source is configured (`new()` /
/// `with_config()` constructors), returns an empty pool — no
/// error, because "no candidates known locally" is a
/// legitimate signal callers may use to fall back to
/// federation.
///
/// `now_ms` is read from `SystemTime::now()` here (the public
/// entry point), then threaded through `rank()` which keeps
/// the explicit-now-ms contract for replay determinism. The
/// trait surface looks "live" but `rank()` stays pure.
///
/// PR-3c scope: no scope filtering, no freshness enforcement,
/// no budget filtering. The CandidateSource does query-aware
/// pruning in its `fetch()`; PR-3d's working-set walker
/// filters by RecallScope::Local. Future PRs add the rest.
async fn recall(
&self,
query: &CapabilityQuery,
context: &RecallContext,
) -> Result<RankedPool, RecallError> {
let candidates = match &self.source {
Some(src) => src.fetch(query, context).await,
None => Vec::new(),
};
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
Ok(self.rank(now_ms, candidates))
}

/// Replay support deferred to a sentinel-owned PR. PR-3c
/// returns `RecallError::ScopeUnreachable` with a clear reason
/// so callers see a typed refusal rather than silent empty
/// pool — per Joel's "never swallow errors" rule. The sentinel
/// PR will add a RecallTraceStore that maps RecallTrace →
/// snapshotted (weights, candidate_set, now_ms), then replay
/// re-ranks deterministically.
async fn replay(
&self,
_trace: &super::recall_trait::RecallTrace,
) -> Result<RankedPool, RecallError> {
Err(RecallError::ScopeUnreachable {
reason: "replay requires RecallTraceStore (sentinel PR); not yet implemented in PR-3c"
.to_string(),
})
}
}

#[cfg(test)]
mod tests {
//! Pin the ranking behavior:
Expand Down Expand Up @@ -518,4 +637,155 @@ mod tests {
"different now_ms must yield different trace_ref"
);
}

// ─── PR-3c: trait impl + CandidateSource tests ─────────────

use crate::genome::recall_trait::{
CapabilityQuery, DemandAlignedRecall, DomainHint, RecallBudget, RecallContext, RecallTrace,
};
use crate::genome::recall::{FreshnessTarget, RecallError, RecallScope, TaskKind};
use crate::genome::working_set::PersonaId;
use parking_lot::Mutex;

/// Stub CandidateSource: returns a pre-set Vec on every call,
/// records each fetch invocation so tests can assert it ran.
struct StubSource {
canned: Vec<CandidateArtifact>,
fetch_calls: Mutex<u32>,
}

impl StubSource {
fn new(canned: Vec<CandidateArtifact>) -> Arc<Self> {
Arc::new(Self {
canned,
fetch_calls: Mutex::new(0),
})
}
fn fetch_count(&self) -> u32 {
*self.fetch_calls.lock()
}
}

#[async_trait]
impl CandidateSource for StubSource {
async fn fetch(
&self,
_query: &CapabilityQuery,
_context: &RecallContext,
) -> Vec<CandidateArtifact> {
*self.fetch_calls.lock() += 1;
self.canned.clone()
}
}

fn sample_query() -> CapabilityQuery {
CapabilityQuery {
task_kind: TaskKind::Chat,
domain_hints: vec![DomainHint::new("test")],
budget: RecallBudget {
max_bytes: 1_000_000,
max_duration_ms: 100,
},
must_include: vec![],
prefer_refined: true,
scope: RecallScope::Local,
freshness_target: FreshnessTarget::BestEffort,
}
}

fn sample_persona() -> PersonaId {
PersonaId::new(Uuid::from_u128(100))
}

/// What this catches: trait impl exists + is object-safe.
/// `Arc<dyn DemandAlignedRecall>` dispatch through LocalDemand
/// AlignedRecall works. This is the seam persona-cognition will
/// use.
#[tokio::test]
async fn recall_dispatches_through_dyn_demand_aligned_recall() {
let recall: Arc<dyn DemandAlignedRecall> =
Arc::new(LocalDemandAlignedRecall::new());
let ctx = RecallContext::cold_start(sample_persona());
let pool = recall.recall(&sample_query(), &ctx).await.unwrap();
assert!(pool.layers.is_empty());
assert!(pool.experts.is_empty());
assert!(pool.engrams.is_empty());
}

/// What this catches: no-source mode returns empty pool, NOT
/// an error. Empty pool is the legitimate "no candidates
/// known locally; caller may try federation" signal.
#[tokio::test]
async fn recall_without_source_returns_empty_pool_not_error() {
let recall = LocalDemandAlignedRecall::new();
let ctx = RecallContext::cold_start(sample_persona());
let result = recall.recall(&sample_query(), &ctx).await;
assert!(result.is_ok());
let pool = result.unwrap();
assert!(pool.layers.is_empty());
}

/// What this catches: with_source dispatches to the source's
/// fetch() — count the calls to prove dispatch happened. The
/// source's canned candidates land in the resulting pool.
#[tokio::test]
async fn recall_with_source_dispatches_to_fetch_and_ranks() {
let hot = ResidencyHint::Hot { role: super::super::tier::TierRole::Fast };
let cand = CandidateArtifact {
kind: PageKind::LoRALayer,
artifact_id: ArtifactId::new(Uuid::from_u128(42)),
semantic_factor: 0.9,
outcome_history_factor: 0.8,
last_used_ms: 0,
residency: hot,
provenance_trust_factor: 0.7,
};
let source = StubSource::new(vec![cand]);
let recall = LocalDemandAlignedRecall::with_source(source.clone());
let ctx = RecallContext::cold_start(sample_persona());

let pool = recall.recall(&sample_query(), &ctx).await.unwrap();

assert_eq!(source.fetch_count(), 1, "source.fetch must be called once");
assert_eq!(pool.layers.len(), 1);
assert_eq!(pool.layers[0].0.0.as_uuid(), Uuid::from_u128(42));
}

/// What this catches: with_config_and_source preserves all
/// three (weights, half_life, source). PR-3d's working-set
/// walker uses this constructor when wiring with governor-
/// driven config.
#[tokio::test]
async fn with_config_and_source_preserves_all_three() {
let w = RecallScoreWeights::new(0.2, 0.2, 0.2, 0.2, 0.2).unwrap();
let source = StubSource::new(Vec::new());
let recall = LocalDemandAlignedRecall::with_config_and_source(w, 12345, source.clone());
assert_eq!(*recall.weights(), w);
assert_eq!(recall.half_life_ms(), 12345);

let ctx = RecallContext::cold_start(sample_persona());
let _ = recall.recall(&sample_query(), &ctx).await.unwrap();
assert_eq!(source.fetch_count(), 1, "source still wired");
}

/// What this catches: replay returns the typed
/// ScopeUnreachable refusal with a clear reason rather than
/// silently returning an empty pool. Per Joel's never-swallow-
/// errors rule — when the sentinel PR adds the RecallTraceStore,
/// this test flips to expect Ok(pool).
#[tokio::test]
async fn replay_returns_typed_not_implemented_refusal_in_pr3c() {
let recall = LocalDemandAlignedRecall::new();
let trace = RecallTrace(ArtifactId::new(Uuid::nil()));
let result = recall.replay(&trace).await;
match result {
Err(RecallError::ScopeUnreachable { reason }) => {
assert!(
reason.contains("RecallTraceStore") || reason.contains("not yet implemented"),
"expected typed not-implemented reason, got: {reason}"
);
}
other => panic!("expected ScopeUnreachable, got {other:?}"),
}
}
}
Loading