Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion src/workers/continuum-core/src/modules/cognition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,15 @@ impl ServiceModule for CognitionModule {
// PressureBroker singleton (#1299) make event fanout the
// intended invariant. Inference is still gated downstream by
// ai_provider::max_concurrency. No hardcoded fixed cap here.
max_concurrency: usize::MAX,
//
// Use 0 ("no semaphore = unbounded"), NOT usize::MAX. The
// runtime's `register()` only creates a Semaphore when
// `max_concurrency > 0`; setting usize::MAX makes it call
// `Semaphore::new(usize::MAX)`, which exceeds tokio's
// MAX_PERMITS (`usize::MAX >> 3`) and panics at registration
// ("a semaphore may not have more than MAX_PERMITS permits").
// 0 expresses "no cap" without instantiating a semaphore.
max_concurrency: 0,
tick_interval: None,
}
}
Expand Down Expand Up @@ -1919,6 +1927,53 @@ mod inline_admission_tests {
}
}

#[cfg(test)]
mod registration_safety_tests {
//! Regression: `runtime::Runtime::register` calls
//! `Semaphore::new(config.max_concurrency)` whenever
//! `max_concurrency > 0`. tokio's MAX_PERMITS is `usize::MAX >> 3`
//! (~2^61), so a `max_concurrency: usize::MAX` config panics at
//! registration. CognitionModule shipped with `usize::MAX` as a
//! mistaken way to spell "no cap" (PR #1306). The fix is to use
//! `0` instead, which the runtime treats as "do not create a
//! semaphore" (truly unbounded).
//!
//! This test pins the fix in place: any future change that
//! reintroduces `usize::MAX` (or any other value above
//! MAX_PERMITS) blows up here before it reaches production.
use super::*;
use crate::rag::RagEngine;
use crate::runtime::runtime::Runtime;
use std::sync::Arc;

#[test]
fn cognition_module_registers_without_semaphore_panic() {
// If max_concurrency is ever raised above tokio's
// MAX_PERMITS this `register()` call panics. The test then
// fails with the original "may not have more than
// MAX_PERMITS" message, which is the easiest possible
// breadcrumb back to this regression.
let runtime = Runtime::new();
let rag_engine = Arc::new(RagEngine::new());
let state = Arc::new(CognitionState::new(rag_engine));
let module = Arc::new(CognitionModule::new(state));
runtime.register(module);

// No assertion needed — surviving registration IS the
// assertion. Sanity-touch config to make the dependency
// explicit for future readers.
let cfg = CognitionModule::new(Arc::new(CognitionState::new(Arc::new(
RagEngine::new(),
))))
.config();
assert_eq!(
cfg.max_concurrency, 0,
"max_concurrency must be 0 (= unbounded via no-semaphore branch); \
any positive value above tokio's MAX_PERMITS panics at registration"
);
}
}

/// Parse an InboxMessage from JSON value.
fn parse_inbox_message(value: &Value) -> Result<InboxMessage, String> {
let p = Params::new(value);
Expand Down
Loading