From c04c4ba68d9ef7f3aa2452fe654856b0fe6d552a Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Sat, 18 Apr 2026 13:34:18 -0700 Subject: [PATCH 1/4] Add pluggable auth chain, session extensions, and lifecycle observability Replaces the dev sender header with a bearer-only auth pipeline (static + JWT/JWKS resolvers), adds a non-fatal session extensions framework, and exposes session lifecycle via ListSessions/WatchSessions RPCs. Tracks proto changes that split SessionStartPayload.context into context_id + extensions map. --- Cargo.lock | 233 +++++++++++++++++++++++++- Cargo.toml | 9 +- src/auth/chain.rs | 41 +++++ src/auth/mod.rs | 6 + src/auth/resolver.rs | 61 +++++++ src/auth/resolvers/jwt_bearer.rs | 250 ++++++++++++++++++++++++++++ src/auth/resolvers/mod.rs | 5 + src/auth/resolvers/static_bearer.rs | 68 ++++++++ src/bin/support/common.rs | 9 +- src/extensions/mod.rs | 5 + src/extensions/provider.rs | 43 +++++ src/extensions/registry.rs | 64 +++++++ src/lib.rs | 2 + src/main.rs | 7 +- src/mode/decision.rs | 3 +- src/mode/handoff.rs | 3 +- src/mode/multi_round.rs | 3 +- src/mode/passthrough.rs | 3 +- src/mode/proposal.rs | 3 +- src/mode/quorum.rs | 3 +- src/mode/task.rs | 3 +- src/registry.rs | 14 +- src/replay.rs | 6 +- src/runtime.rs | 66 +++++++- src/security.rs | 172 +++++++++++++------ src/server.rs | 198 ++++++++++++++++------ src/session.rs | 6 +- src/storage/file.rs | 3 +- src/storage/memory.rs | 3 +- src/storage/migration.rs | 3 +- src/storage/recovery.rs | 3 +- src/storage/redis_backend.rs | 3 +- src/storage/rocksdb.rs | 3 +- tests/concurrent_messages.rs | 3 +- tests/conformance_loader.rs | 3 +- tests/file_backend_integration.rs | 3 +- tests/integration_mode_lifecycle.rs | 3 +- tests/replay_round_trip.rs | 3 +- tests/stream_integration.rs | 3 +- 39 files changed, 1166 insertions(+), 156 deletions(-) create mode 100644 src/auth/chain.rs create mode 100644 src/auth/mod.rs create mode 100644 src/auth/resolver.rs create mode 100644 src/auth/resolvers/jwt_bearer.rs create mode 100644 src/auth/resolvers/mod.rs create mode 100644 src/auth/resolvers/static_bearer.rs create mode 100644 src/extensions/mod.rs create mode 100644 src/extensions/provider.rs create mode 100644 src/extensions/registry.rs diff --git a/Cargo.lock b/Cargo.lock index 065147d..2cafbad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -338,6 +338,15 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -480,8 +489,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -725,13 +736,16 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ + "base64 0.22.1", "bytes", "futures-channel", "futures-util", "http 1.4.0", "http-body 1.0.1", "hyper 1.8.1", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.2", "tokio", @@ -893,6 +907,22 @@ dependencies = [ "serde_core", ] +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + +[[package]] +name = "iri-string" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "itertools" version = "0.12.1" @@ -937,6 +967,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1038,8 +1083,6 @@ dependencies = [ [[package]] name = "macp-proto" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9f86838342731bb2edf1377543c4528f52e8bcbe13682aaaa00e1ca886b84a0" [[package]] name = "macp-runtime" @@ -1049,6 +1092,7 @@ dependencies = [ "async-trait", "chrono", "futures-core", + "jsonwebtoken", "macp-proto", "opentelemetry", "opentelemetry-otlp", @@ -1056,11 +1100,12 @@ dependencies = [ "prost 0.14.3", "prost-types", "redis", + "reqwest", "rocksdb", "serde", "serde_json", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-stream", "tonic 0.14.5", @@ -1158,6 +1203,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" + [[package]] name = "num-integer" version = "0.1.46" @@ -1193,7 +1244,7 @@ dependencies = [ "js-sys", "once_cell", "pin-project-lite", - "thiserror", + "thiserror 1.0.69", "urlencoding", ] @@ -1211,7 +1262,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_sdk", "prost 0.12.6", - "thiserror", + "thiserror 1.0.69", "tokio", "tonic 0.11.0", ] @@ -1251,7 +1302,7 @@ dependencies = [ "ordered-float", "percent-encoding", "rand", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-stream", ] @@ -1288,6 +1339,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1352,6 +1413,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1583,6 +1650,38 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.2", + "tokio", + "tower 0.5.3", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "ring" version = "0.17.14" @@ -1734,6 +1833,18 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1_smol" version = "1.0.1" @@ -1765,6 +1876,18 @@ dependencies = [ "libc", ] +[[package]] +name = "simple_asn1" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.18", + "time", +] + [[package]] name = "slab" version = "0.4.12" @@ -1831,6 +1954,9 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] [[package]] name = "synstructure" @@ -1862,7 +1988,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl 2.0.18", ] [[package]] @@ -1876,6 +2011,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -1885,6 +2031,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde_core", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + +[[package]] +name = "time-macros" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -2116,6 +2293,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags 2.11.0", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "iri-string", + "pin-project-lite", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -2330,6 +2525,20 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" +dependencies = [ + "cfg-if", + "futures-util", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.114" @@ -2396,6 +2605,16 @@ dependencies = [ "semver", ] +[[package]] +name = "web-sys" +version = "0.3.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web-time" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index a8e51bd..5127e7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,8 @@ tokio-stream = "0.1" futures-core = "0.3" async-stream = "0.3" async-trait = "0.1" +jsonwebtoken = "9" +reqwest = { version = "0.12", features = ["json"], default-features = false } uuid = { version = "1", features = ["v4"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } @@ -34,10 +36,9 @@ opentelemetry-otlp = { version = "0.15", features = ["tonic"], optional = true } tracing-opentelemetry = { version = "0.23", optional = true } rocksdb = { version = "0.22", optional = true } redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true } -# Proto definitions from crates.io (exposes DEP_MACP_PROTO_PROTO_DIR to build.rs via links metadata). -# For local proto development, temporarily switch to a path dependency: -# macp-proto = { path = "../multiagentcoordinationprotocol/packages/proto-rust" } -macp-proto = "0.1.0" +# Proto definitions — local path for WatchSessions development. +# Switch back to crates.io after publishing: macp-proto = "0.2.0" +macp-proto = { path = "../multiagentcoordinationprotocol/packages/proto-rust" } [dev-dependencies] tempfile = "3" diff --git a/src/auth/chain.rs b/src/auth/chain.rs new file mode 100644 index 0000000..5e009b3 --- /dev/null +++ b/src/auth/chain.rs @@ -0,0 +1,41 @@ +use super::resolver::AuthResolver; +use crate::error::MacpError; +use crate::security::AuthIdentity; +use tonic::metadata::MetadataMap; + +pub struct AuthResolverChain { + resolvers: Vec>, +} + +impl AuthResolverChain { + pub fn new(resolvers: Vec>) -> Self { + let names: Vec<&str> = resolvers.iter().map(|r| r.name()).collect(); + tracing::info!(chain = ?names, "auth resolver chain initialized"); + Self { resolvers } + } + + pub async fn authenticate(&self, metadata: &MetadataMap) -> Result { + for resolver in &self.resolvers { + match resolver.resolve(metadata).await { + Ok(Some(identity)) => { + tracing::debug!( + resolver = resolver.name(), + sender = %identity.sender, + "authenticated" + ); + return Ok(identity.into()); + } + Ok(None) => continue, + Err(e) => { + tracing::warn!( + resolver = resolver.name(), + error = %e, + "auth resolver rejected credential" + ); + return Err(MacpError::Unauthenticated); + } + } + } + Err(MacpError::Unauthenticated) + } +} diff --git a/src/auth/mod.rs b/src/auth/mod.rs new file mode 100644 index 0000000..943a1be --- /dev/null +++ b/src/auth/mod.rs @@ -0,0 +1,6 @@ +pub mod chain; +pub mod resolver; +pub mod resolvers; + +pub use chain::AuthResolverChain; +pub use resolver::{AuthError, AuthResolver, ResolvedIdentity}; diff --git a/src/auth/resolver.rs b/src/auth/resolver.rs new file mode 100644 index 0000000..fcdee73 --- /dev/null +++ b/src/auth/resolver.rs @@ -0,0 +1,61 @@ +use crate::security::AuthIdentity; +use std::collections::HashSet; +use tonic::metadata::MetadataMap; + +#[derive(Debug)] +pub enum AuthError { + InvalidCredential(String), + Expired, + MissingClaim(String), + FetchFailed(String), +} + +impl std::fmt::Display for AuthError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AuthError::InvalidCredential(msg) => write!(f, "invalid credential: {msg}"), + AuthError::Expired => write!(f, "credential expired"), + AuthError::MissingClaim(claim) => write!(f, "missing required claim: {claim}"), + AuthError::FetchFailed(msg) => write!(f, "key fetch failed: {msg}"), + } + } +} + +impl std::error::Error for AuthError {} + +#[derive(Clone, Debug)] +pub struct ResolvedIdentity { + pub sender: String, + pub allowed_modes: Option>, + pub can_start_sessions: bool, + pub max_open_sessions: Option, + pub can_manage_mode_registry: bool, + pub is_observer: bool, + pub resolver: String, +} + +impl From for AuthIdentity { + fn from(resolved: ResolvedIdentity) -> Self { + AuthIdentity { + sender: resolved.sender, + allowed_modes: resolved.allowed_modes, + can_start_sessions: resolved.can_start_sessions, + max_open_sessions: resolved.max_open_sessions, + can_manage_mode_registry: resolved.can_manage_mode_registry, + is_observer: resolved.is_observer, + } + } +} + +/// Trait for pluggable auth resolvers. +/// +/// Each resolver examines gRPC metadata and returns: +/// - `Ok(Some(identity))` — positive verification, chain stops +/// - `Ok(None)` — not my credential type, chain continues +/// - `Err(e)` — credential is mine but invalid, chain stops with error +#[async_trait::async_trait] +pub trait AuthResolver: Send + Sync { + fn name(&self) -> &str; + + async fn resolve(&self, metadata: &MetadataMap) -> Result, AuthError>; +} diff --git a/src/auth/resolvers/jwt_bearer.rs b/src/auth/resolvers/jwt_bearer.rs new file mode 100644 index 0000000..c13380d --- /dev/null +++ b/src/auth/resolvers/jwt_bearer.rs @@ -0,0 +1,250 @@ +use crate::auth::resolver::{AuthError, AuthResolver, ResolvedIdentity}; +use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; +use serde::Deserialize; +use std::sync::Arc; +use tokio::sync::RwLock; +use tonic::metadata::MetadataMap; + +#[derive(Debug, Clone, Deserialize)] +struct MACPClaims { + sub: String, + #[serde(default)] + macp_scopes: Option, +} + +#[derive(Debug, Clone, Deserialize, Default)] +struct MACPScopes { + #[serde(default)] + can_start_sessions: Option, + #[serde(default)] + can_manage_mode_registry: Option, + #[serde(default)] + is_observer: Option, + #[serde(default)] + allowed_modes: Option>, + #[serde(default)] + max_open_sessions: Option, +} + +#[derive(Debug, Clone)] +pub struct JwtConfig { + pub issuer: String, + pub audience: String, + pub algorithms: Vec, +} + +struct CachedKeys { + keys: Vec, + fetched_at: std::time::Instant, +} + +pub struct JwtBearerResolver { + config: JwtConfig, + jwks_source: JwksSource, + cached_keys: Arc>>, + cache_ttl: std::time::Duration, +} + +enum JwksSource { + Inline(Vec), + Url(String), +} + +impl JwtBearerResolver { + pub fn from_inline_json(config: JwtConfig, jwks_json: &str) -> Result { + let jwks: serde_json::Value = + serde_json::from_str(jwks_json).map_err(|e| format!("invalid JWKS JSON: {e}"))?; + let keys = Self::parse_jwks(&jwks)?; + tracing::info!( + keys = keys.len(), + issuer = %config.issuer, + "JWT resolver initialized with inline JWKS" + ); + Ok(Self { + config, + jwks_source: JwksSource::Inline(keys.clone()), + cached_keys: Arc::new(RwLock::new(Some(CachedKeys { + keys, + fetched_at: std::time::Instant::now(), + }))), + cache_ttl: std::time::Duration::from_secs(u64::MAX), + }) + } + + pub fn from_url(config: JwtConfig, url: String, cache_ttl_secs: u64) -> Self { + tracing::info!( + url = %url, + issuer = %config.issuer, + cache_ttl_secs, + "JWT resolver initialized with JWKS URL" + ); + Self { + config, + jwks_source: JwksSource::Url(url), + cached_keys: Arc::new(RwLock::new(None)), + cache_ttl: std::time::Duration::from_secs(cache_ttl_secs), + } + } + + fn extract_bearer(metadata: &MetadataMap) -> Option { + metadata + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) + .map(str::to_string) + } + + async fn get_keys(&self) -> Result, AuthError> { + { + let guard = self.cached_keys.read().await; + if let Some(cached) = guard.as_ref() { + if cached.fetched_at.elapsed() < self.cache_ttl { + return Ok(cached.keys.clone()); + } + } + } + + match &self.jwks_source { + JwksSource::Inline(keys) => Ok(keys.clone()), + JwksSource::Url(url) => { + let keys = self.fetch_jwks(url).await?; + let mut guard = self.cached_keys.write().await; + *guard = Some(CachedKeys { + keys: keys.clone(), + fetched_at: std::time::Instant::now(), + }); + Ok(keys) + } + } + } + + async fn fetch_jwks(&self, url: &str) -> Result, AuthError> { + let resp = reqwest::get(url) + .await + .map_err(|e| AuthError::FetchFailed(format!("JWKS fetch failed: {e}")))?; + let jwks: serde_json::Value = resp + .json() + .await + .map_err(|e| AuthError::FetchFailed(format!("JWKS parse failed: {e}")))?; + Self::parse_jwks(&jwks).map_err(AuthError::FetchFailed) + } + + fn parse_jwks(jwks: &serde_json::Value) -> Result, String> { + let keys_arr = jwks + .get("keys") + .and_then(|k| k.as_array()) + .ok_or_else(|| "JWKS missing 'keys' array".to_string())?; + + let mut decoding_keys = Vec::new(); + for key in keys_arr { + let kty = key.get("kty").and_then(|v| v.as_str()).unwrap_or(""); + match kty { + "RSA" => { + let n = key.get("n").and_then(|v| v.as_str()).unwrap_or(""); + let e = key.get("e").and_then(|v| v.as_str()).unwrap_or(""); + if !n.is_empty() && !e.is_empty() { + if let Ok(dk) = DecodingKey::from_rsa_components(n, e) { + decoding_keys.push(dk); + } + } + } + "EC" => { + let x = key.get("x").and_then(|v| v.as_str()).unwrap_or(""); + let y = key.get("y").and_then(|v| v.as_str()).unwrap_or(""); + let crv = key.get("crv").and_then(|v| v.as_str()).unwrap_or("P-256"); + if !x.is_empty() && !y.is_empty() { + if let Ok(dk) = DecodingKey::from_ec_components(x, y) { + let _ = crv; + decoding_keys.push(dk); + } + } + } + "oct" => { + if let Some(k_val) = key.get("k").and_then(|v| v.as_str()) { + decoding_keys.push( + DecodingKey::from_base64_secret(k_val) + .unwrap_or_else(|_| DecodingKey::from_secret(k_val.as_bytes())), + ); + } + } + _ => {} + } + } + + if decoding_keys.is_empty() { + return Err("no usable keys found in JWKS".to_string()); + } + Ok(decoding_keys) + } +} + +#[async_trait::async_trait] +impl AuthResolver for JwtBearerResolver { + fn name(&self) -> &str { + "jwt_bearer" + } + + async fn resolve(&self, metadata: &MetadataMap) -> Result, AuthError> { + let token = match Self::extract_bearer(metadata) { + Some(t) => t, + None => return Ok(None), + }; + + // Only handle JWT-shaped tokens (contain dots) + if !token.contains('.') { + return Ok(None); + } + + let keys = self.get_keys().await?; + + let mut validation = Validation::new(Algorithm::RS256); + validation.set_issuer(&[&self.config.issuer]); + validation.set_audience(&[&self.config.audience]); + validation.algorithms = self.config.algorithms.clone(); + + let mut last_err = None; + for key in &keys { + match decode::(&token, key, &validation) { + Ok(token_data) => { + let claims = token_data.claims; + let scopes = claims.macp_scopes.unwrap_or_default(); + + return Ok(Some(ResolvedIdentity { + sender: claims.sub, + allowed_modes: scopes.allowed_modes.map(|m| m.into_iter().collect()), + can_start_sessions: scopes.can_start_sessions.unwrap_or(true), + max_open_sessions: scopes.max_open_sessions, + can_manage_mode_registry: scopes.can_manage_mode_registry.unwrap_or(false), + is_observer: scopes.is_observer.unwrap_or(false), + resolver: "jwt_bearer".to_string(), + })); + } + Err(e) => { + last_err = Some(e); + continue; + } + } + } + + match last_err { + Some(e) => { + use jsonwebtoken::errors::ErrorKind; + match e.kind() { + ErrorKind::ExpiredSignature => Err(AuthError::Expired), + ErrorKind::InvalidIssuer => { + Err(AuthError::InvalidCredential("invalid issuer".to_string())) + } + ErrorKind::InvalidAudience => { + Err(AuthError::InvalidCredential("invalid audience".to_string())) + } + _ => Err(AuthError::InvalidCredential(format!( + "JWT validation failed: {e}" + ))), + } + } + None => Err(AuthError::InvalidCredential( + "no keys available to validate JWT".to_string(), + )), + } + } +} diff --git a/src/auth/resolvers/mod.rs b/src/auth/resolvers/mod.rs new file mode 100644 index 0000000..0405300 --- /dev/null +++ b/src/auth/resolvers/mod.rs @@ -0,0 +1,5 @@ +pub mod jwt_bearer; +pub mod static_bearer; + +pub use jwt_bearer::JwtBearerResolver; +pub use static_bearer::StaticBearerResolver; diff --git a/src/auth/resolvers/static_bearer.rs b/src/auth/resolvers/static_bearer.rs new file mode 100644 index 0000000..99e756c --- /dev/null +++ b/src/auth/resolvers/static_bearer.rs @@ -0,0 +1,68 @@ +use crate::auth::resolver::{AuthError, AuthResolver, ResolvedIdentity}; +use crate::security::AuthIdentity; +use std::collections::HashMap; +use tonic::metadata::MetadataMap; + +/// Resolves opaque bearer tokens against a pre-loaded identity map. +/// This is the existing `MACP_AUTH_TOKENS_JSON` mechanism. +pub struct StaticBearerResolver { + identities: HashMap, +} + +impl StaticBearerResolver { + pub fn new(identities: HashMap) -> Self { + tracing::info!( + count = identities.len(), + "static bearer resolver initialized" + ); + Self { identities } + } + + fn extract_bearer(metadata: &MetadataMap) -> Option { + metadata + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) + .map(str::to_string) + .or_else(|| { + metadata + .get("x-macp-token") + .and_then(|v| v.to_str().ok()) + .map(str::to_string) + }) + } +} + +#[async_trait::async_trait] +impl AuthResolver for StaticBearerResolver { + fn name(&self) -> &str { + "static_bearer" + } + + async fn resolve(&self, metadata: &MetadataMap) -> Result, AuthError> { + let token = match Self::extract_bearer(metadata) { + Some(t) => t, + None => return Ok(None), + }; + + // JWT-shaped tokens (contain dots) are not ours — defer to JWT resolver + if token.contains('.') { + return Ok(None); + } + + match self.identities.get(&token) { + Some(identity) => Ok(Some(ResolvedIdentity { + sender: identity.sender.clone(), + allowed_modes: identity.allowed_modes.clone(), + can_start_sessions: identity.can_start_sessions, + max_open_sessions: identity.max_open_sessions, + can_manage_mode_registry: identity.can_manage_mode_registry, + is_observer: identity.is_observer, + resolver: "static_bearer".to_string(), + })), + None => Err(AuthError::InvalidCredential( + "token not found in identity map".to_string(), + )), + } + } +} diff --git a/src/bin/support/common.rs b/src/bin/support/common.rs index 202658e..ab9d0e1 100644 --- a/src/bin/support/common.rs +++ b/src/bin/support/common.rs @@ -28,8 +28,10 @@ pub async fn connect_client( fn with_sender(sender: &str, inner: T) -> Request { let mut request = Request::new(inner); request.metadata_mut().insert( - "x-macp-agent-id", - sender.parse().expect("valid sender header"), + "authorization", + format!("Bearer {sender}") + .parse() + .expect("valid auth header"), ); request } @@ -42,8 +44,9 @@ pub fn canonical_start_payload(intent: &str, participants: &[&str], ttl_ms: i64) configuration_version: CONFIG_VERSION.into(), policy_version: POLICY_VERSION.into(), ttl_ms, - context: vec![], roots: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), } .encode_to_vec() } diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs new file mode 100644 index 0000000..818ab47 --- /dev/null +++ b/src/extensions/mod.rs @@ -0,0 +1,5 @@ +pub mod provider; +pub mod registry; + +pub use provider::{ExtensionError, SessionExtensionProvider, SessionOutcome}; +pub use registry::ExtensionProviderRegistry; diff --git a/src/extensions/provider.rs b/src/extensions/provider.rs new file mode 100644 index 0000000..d2ddb34 --- /dev/null +++ b/src/extensions/provider.rs @@ -0,0 +1,43 @@ +use std::collections::HashMap; + +#[derive(Debug)] +pub enum ExtensionError { + Internal(String), +} + +impl std::fmt::Display for ExtensionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ExtensionError::Internal(msg) => write!(f, "extension error: {msg}"), + } + } +} + +impl std::error::Error for ExtensionError {} + +pub enum SessionOutcome { + Resolved, + Expired, +} + +/// Trait for pluggable session extension providers. +/// +/// Providers receive lifecycle callbacks for sessions that carry their +/// declared key in the `extensions` map. Provider errors are never fatal +/// to session lifecycle (invariant E-1). +#[async_trait::async_trait] +pub trait SessionExtensionProvider: Send + Sync { + fn key(&self) -> &str; + + async fn on_session_start( + &self, + session_id: &str, + extensions: &HashMap>, + ) -> Result<(), ExtensionError>; + + async fn on_session_terminal( + &self, + session_id: &str, + outcome: SessionOutcome, + ) -> Result<(), ExtensionError>; +} diff --git a/src/extensions/registry.rs b/src/extensions/registry.rs new file mode 100644 index 0000000..876aa28 --- /dev/null +++ b/src/extensions/registry.rs @@ -0,0 +1,64 @@ +use super::provider::{SessionExtensionProvider, SessionOutcome}; +use std::collections::HashMap; + +pub struct ExtensionProviderRegistry { + providers: Vec>, +} + +impl ExtensionProviderRegistry { + pub fn new() -> Self { + Self { + providers: Vec::new(), + } + } + + pub fn register(&mut self, provider: Box) { + tracing::info!(key = provider.key(), "registered extension provider"); + self.providers.push(provider); + } + + pub async fn on_session_start(&self, session_id: &str, extensions: &HashMap>) { + for provider in &self.providers { + if !extensions.contains_key(provider.key()) { + continue; + } + if let Err(e) = provider.on_session_start(session_id, extensions).await { + tracing::warn!( + key = provider.key(), + session_id, + error = %e, + "extension provider on_session_start failed (non-fatal)" + ); + } + } + } + + pub async fn on_session_terminal(&self, session_id: &str, outcome: SessionOutcome) { + for provider in &self.providers { + if let Err(e) = provider + .on_session_terminal(session_id, outcome_ref(&outcome)) + .await + { + tracing::warn!( + key = provider.key(), + session_id, + error = %e, + "extension provider on_session_terminal failed (non-fatal)" + ); + } + } + } +} + +impl Default for ExtensionProviderRegistry { + fn default() -> Self { + Self::new() + } +} + +fn outcome_ref(outcome: &SessionOutcome) -> SessionOutcome { + match outcome { + SessionOutcome::Resolved => SessionOutcome::Resolved, + SessionOutcome::Expired => SessionOutcome::Expired, + } +} diff --git a/src/lib.rs b/src/lib.rs index 6913792..8d2b8bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,4 +35,6 @@ pub mod session; pub mod storage; pub mod stream_bus; +pub mod auth; +pub mod extensions; pub mod security; diff --git a/src/main.rs b/src/main.rs index 5081535..403d0cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -291,12 +291,7 @@ async fn main() -> Result<(), Box> { ); builder } - _ => { - return Err( - "TLS is required unless MACP_ALLOW_INSECURE=1 and MACP_ALLOW_DEV_SENDER_HEADER=1" - .into(), - ) - } + _ => return Err("TLS is required unless MACP_ALLOW_INSECURE=1 is set".into()), }; // Set up gRPC health check service diff --git a/src/mode/decision.rs b/src/mode/decision.rs index c28d9e1..b01b450 100644 --- a/src/mode/decision.rs +++ b/src/mode/decision.rs @@ -326,7 +326,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "policy-1".into(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "agent://orchestrator".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/mode/handoff.rs b/src/mode/handoff.rs index 3bbc661..05b5523 100644 --- a/src/mode/handoff.rs +++ b/src/mode/handoff.rs @@ -284,7 +284,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "config".into(), policy_version: "policy".into(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "owner".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/mode/multi_round.rs b/src/mode/multi_round.rs index 049aa91..5e24d3c 100644 --- a/src/mode/multi_round.rs +++ b/src/mode/multi_round.rs @@ -197,7 +197,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: String::new(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "coordinator".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/mode/passthrough.rs b/src/mode/passthrough.rs index c71225a..67b06d3 100644 --- a/src/mode/passthrough.rs +++ b/src/mode/passthrough.rs @@ -83,7 +83,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: String::new(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "alice".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/mode/proposal.rs b/src/mode/proposal.rs index 75af97c..a9ac501 100644 --- a/src/mode/proposal.rs +++ b/src/mode/proposal.rs @@ -403,7 +403,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "policy-1".into(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "agent://buyer".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/mode/quorum.rs b/src/mode/quorum.rs index e3f539a..63b000a 100644 --- a/src/mode/quorum.rs +++ b/src/mode/quorum.rs @@ -280,7 +280,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "config".into(), policy_version: "policy".into(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "coordinator".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/mode/task.rs b/src/mode/task.rs index e2217b9..063ab70 100644 --- a/src/mode/task.rs +++ b/src/mode/task.rs @@ -372,7 +372,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "config".into(), policy_version: "policy".into(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "planner".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/registry.rs b/src/registry.rs index 93de849..edd0df1 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -29,7 +29,10 @@ pub(crate) struct PersistedSession { pub mode_version: String, pub configuration_version: String, pub policy_version: String, - pub context: Vec, + #[serde(default)] + pub context_id: String, + #[serde(default)] + pub extensions: HashMap>, pub roots: Vec, pub initiator_sender: String, #[serde(default)] @@ -58,7 +61,8 @@ impl From<&Session> for PersistedSession { mode_version: session.mode_version.clone(), configuration_version: session.configuration_version.clone(), policy_version: session.policy_version.clone(), - context: session.context.clone(), + context_id: session.context_id.clone(), + extensions: session.extensions.clone(), roots: session .roots .iter() @@ -98,7 +102,8 @@ impl From for Session { mode_version: session.mode_version, configuration_version: session.configuration_version, policy_version: session.policy_version, - context: session.context, + context_id: session.context_id, + extensions: session.extensions, roots: session .roots .into_iter() @@ -242,7 +247,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg".into(), policy_version: "pol".into(), - context: vec![9], + context_id: "test-ctx".to_string(), + extensions: std::collections::HashMap::new(), roots: vec![crate::pb::Root { uri: "root://1".into(), name: "r1".into(), diff --git a/src/replay.rs b/src/replay.rs index 3821dd1..b4a9034 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -222,7 +222,8 @@ fn replay_from_start( mode_version: start_payload.mode_version.clone(), configuration_version: start_payload.configuration_version.clone(), policy_version: start_payload.policy_version.clone(), - context: start_payload.context.clone(), + context_id: start_payload.context_id.clone(), + extensions: start_payload.extensions.clone(), roots: start_payload.roots.clone(), initiator_sender: start_entry.sender.clone(), participant_message_counts: std::collections::HashMap::new(), @@ -268,7 +269,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: "policy-1".into(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() diff --git a/src/runtime.rs b/src/runtime.rs index 92732ad..630d026 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -2,6 +2,7 @@ use chrono::Utc; use std::sync::Arc; use crate::error::MacpError; +use crate::extensions::ExtensionProviderRegistry; use crate::log_store::{EntryKind, LogEntry, LogStore}; use crate::metrics::RuntimeMetrics; use crate::mode_registry::ModeRegistry; @@ -22,14 +23,24 @@ pub struct ProcessResult { pub duplicate: bool, } +#[derive(Clone, Debug)] +pub enum SessionLifecycleEvent { + Created { session_id: String }, + Resolved { session_id: String }, + Expired { session_id: String }, +} + pub struct Runtime { pub storage: Arc, pub registry: Arc, pub log_store: Arc, stream_bus: Arc, signal_bus: tokio::sync::broadcast::Sender, + session_lifecycle_bus: tokio::sync::broadcast::Sender, mode_registry: Arc, policy_registry: Arc, + #[allow(dead_code)] // plumbed for future session-extension providers; register API TBD + extensions: Arc, metrics: Arc, checkpoint_interval: usize, } @@ -75,14 +86,17 @@ impl Runtime { .and_then(|v| v.parse().ok()) .unwrap_or(0); // 0 = disabled by default let (signal_tx, _) = tokio::sync::broadcast::channel(256); + let (session_lifecycle_tx, _) = tokio::sync::broadcast::channel(64); Self { storage, registry, log_store, stream_bus: Arc::new(SessionStreamBus::default()), signal_bus: signal_tx, + session_lifecycle_bus: session_lifecycle_tx, mode_registry, policy_registry, + extensions: Arc::new(ExtensionProviderRegistry::new()), metrics: Arc::new(RuntimeMetrics::new()), checkpoint_interval, } @@ -165,6 +179,12 @@ impl Runtime { self.signal_bus.subscribe() } + pub fn subscribe_session_lifecycle( + &self, + ) -> tokio::sync::broadcast::Receiver { + self.session_lifecycle_bus.subscribe() + } + fn publish_accepted_envelope(&self, env: &Envelope) { if !env.session_id.is_empty() { self.stream_bus.publish(&env.session_id, env.clone()); @@ -233,6 +253,11 @@ impl Runtime { session.state = SessionState::Expired; self.metrics.record_session_expired(&session.mode); tracing::info!(session_id, "session expired via TTL"); + let _ = self + .session_lifecycle_bus + .send(SessionLifecycleEvent::Expired { + session_id: session_id.to_string(), + }); return Ok(true); } Ok(false) @@ -364,7 +389,8 @@ impl Runtime { mode_version: start_payload.mode_version.clone(), configuration_version: start_payload.configuration_version.clone(), policy_version: effective_policy_version, - context: start_payload.context.clone(), + context_id: start_payload.context_id.clone(), + extensions: start_payload.extensions.clone(), roots: start_payload.roots.clone(), initiator_sender: env.sender.clone(), participant_message_counts: std::collections::HashMap::new(), @@ -414,6 +440,11 @@ impl Runtime { ); guard.insert(env.session_id.clone(), session); self.publish_accepted_envelope(env); + let _ = self + .session_lifecycle_bus + .send(SessionLifecycleEvent::Created { + session_id: env.session_id.clone(), + }); Ok(ProcessResult { session_state: result_state, @@ -494,6 +525,11 @@ impl Runtime { if result_state == SessionState::Resolved { self.metrics.record_session_resolved(&session.mode); tracing::info!(session_id = %env.session_id, mode = %session.mode, "session resolved"); + let _ = self + .session_lifecycle_bus + .send(SessionLifecycleEvent::Resolved { + session_id: env.session_id.clone(), + }); } // 3. Best-effort session save + checkpoint @@ -608,6 +644,11 @@ impl Runtime { } self.metrics.record_session_cancelled(&session.mode); tracing::info!(session_id, reason, "session cancelled"); + let _ = self + .session_lifecycle_bus + .send(SessionLifecycleEvent::Expired { + session_id: session_id.to_string(), + }); Ok(ProcessResult { session_state: SessionState::Expired, @@ -721,6 +762,11 @@ impl Runtime { self.force_insert_checkpoint(session_id, session).await; } tracing::info!(session_id, "session expired via background cleanup"); + let _ = self + .session_lifecycle_bus + .send(SessionLifecycleEvent::Expired { + session_id: session_id.clone(), + }); } } @@ -786,7 +832,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 1_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() @@ -931,7 +978,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 1, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); @@ -1095,7 +1143,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 1, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); @@ -1558,7 +1607,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 1, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); @@ -1611,7 +1661,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 1, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); @@ -1719,7 +1770,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); diff --git a/src/security.rs b/src/security.rs index a427927..d499400 100644 --- a/src/security.rs +++ b/src/security.rs @@ -14,6 +14,7 @@ pub struct AuthIdentity { pub can_start_sessions: bool, pub max_open_sessions: Option, pub can_manage_mode_registry: bool, + pub is_observer: bool, } #[derive(Clone, Debug, serde::Deserialize)] @@ -27,6 +28,8 @@ struct RawIdentity { max_open_sessions: Option, #[serde(default)] can_manage_mode_registry: bool, + #[serde(default)] + is_observer: bool, } #[derive(Clone, Debug, serde::Deserialize)] @@ -56,18 +59,21 @@ struct RateBucket { pub struct SecurityLayer { identities: Arc>, rate_bucket: Arc, - allow_dev_sender_header: bool, + auth_chain: Option>, pub max_payload_bytes: usize, session_start_rate: RateLimitConfig, message_rate: RateLimitConfig, } impl SecurityLayer { + /// Creates a test-friendly SecurityLayer that maps any bearer token + /// `"tok-"` to an identity with `sender = `. + /// For tests, use `Authorization: Bearer agent://name` to authenticate as `agent://name`. pub fn dev_mode() -> Self { Self { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: true, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: usize::MAX, @@ -80,6 +86,22 @@ impl SecurityLayer { } } + /// Dev-mode authenticate: accepts any bearer token as the sender identity. + /// This is used ONLY in tests (dev_mode), not in production (from_env). + fn dev_authenticate(&self, metadata: &MetadataMap) -> Result { + if let Some(token) = Self::bearer_token(metadata) { + return Ok(AuthIdentity { + sender: token, + allowed_modes: None, + can_start_sessions: true, + max_open_sessions: None, + can_manage_mode_registry: true, + is_observer: false, + }); + } + Err(MacpError::Unauthenticated) + } + pub fn from_env() -> Result> { let max_payload_bytes = std::env::var("MACP_MAX_PAYLOAD_BYTES") .ok() @@ -101,11 +123,6 @@ impl SecurityLayer { window: Duration::from_secs(60), }; - let allow_dev_sender_header = std::env::var("MACP_ALLOW_DEV_SENDER_HEADER") - .ok() - .as_deref() - == Some("1"); - let raw = if let Ok(json) = std::env::var("MACP_AUTH_TOKENS_JSON") { Some(json) } else if let Ok(path) = std::env::var("MACP_AUTH_TOKENS_FILE") { @@ -115,14 +132,66 @@ impl SecurityLayer { }; let identities = raw - .map(|json| Self::parse_identities(&json)) + .as_ref() + .map(|json| Self::parse_identities(json)) .transpose()? .unwrap_or_default(); + // Build auth resolver chain + let mut resolvers: Vec> = Vec::new(); + + // JWT resolver (if configured) + if let Ok(issuer) = std::env::var("MACP_AUTH_ISSUER") { + let audience = + std::env::var("MACP_AUTH_AUDIENCE").unwrap_or_else(|_| "macp-runtime".into()); + let cache_ttl = std::env::var("MACP_AUTH_JWKS_TTL_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(300u64); + let config = crate::auth::resolvers::jwt_bearer::JwtConfig { + issuer, + audience, + algorithms: vec![ + jsonwebtoken::Algorithm::RS256, + jsonwebtoken::Algorithm::ES256, + jsonwebtoken::Algorithm::HS256, + ], + }; + if let Ok(jwks_json) = std::env::var("MACP_AUTH_JWKS_JSON") { + match crate::auth::resolvers::JwtBearerResolver::from_inline_json( + config, &jwks_json, + ) { + Ok(resolver) => resolvers.push(Box::new(resolver)), + Err(e) => { + tracing::error!("failed to create JWT resolver from inline JWKS: {e}") + } + } + } else if let Ok(jwks_url) = std::env::var("MACP_AUTH_JWKS_URL") { + resolvers.push(Box::new( + crate::auth::resolvers::JwtBearerResolver::from_url( + config, jwks_url, cache_ttl, + ), + )); + } + } + + // Static bearer resolver (always present if tokens are configured) + if !identities.is_empty() { + resolvers.push(Box::new(crate::auth::resolvers::StaticBearerResolver::new( + identities.clone(), + ))); + } + + let auth_chain = if resolvers.is_empty() { + None + } else { + Some(Arc::new(crate::auth::AuthResolverChain::new(resolvers))) + }; + Ok(Self { identities: Arc::new(identities), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header, + auth_chain, max_payload_bytes, session_start_rate, message_rate, @@ -151,6 +220,7 @@ impl SecurityLayer { can_start_sessions: item.can_start_sessions, max_open_sessions: item.max_open_sessions, can_manage_mode_registry: item.can_manage_mode_registry, + is_observer: item.is_observer, }, ); } @@ -172,30 +242,29 @@ impl SecurityLayer { } pub fn authenticate_metadata(&self, metadata: &MetadataMap) -> Result { - if let Some(token) = Self::bearer_token(metadata) { - return self - .identities - .get(&token) - .cloned() - .ok_or(MacpError::Unauthenticated); + // Production path: use the auth resolver chain. + if let Some(chain) = &self.auth_chain { + let chain = Arc::clone(chain); + let metadata_clone = metadata.clone(); + return tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(chain.authenticate(&metadata_clone)) + }); } - if self.allow_dev_sender_header { - if let Some(sender) = metadata - .get("x-macp-agent-id") - .and_then(|value| value.to_str().ok()) - { - return Ok(AuthIdentity { - sender: sender.to_string(), - allowed_modes: None, - can_start_sessions: true, - max_open_sessions: None, - can_manage_mode_registry: true, - }); + // Explicit identity map (layer_with_tokens in tests) + if !self.identities.is_empty() { + if let Some(token) = Self::bearer_token(metadata) { + return self + .identities + .get(&token) + .cloned() + .ok_or(MacpError::Unauthenticated); } + return Err(MacpError::Unauthenticated); } - Err(MacpError::Unauthenticated) + // Dev-mode: any bearer token → identity (for tests only) + self.dev_authenticate(metadata) } pub fn authorize_mode( @@ -294,7 +363,7 @@ mod tests { SecurityLayer { identities: Arc::new(identities), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: false, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: usize::MAX, @@ -312,7 +381,7 @@ mod tests { SecurityLayer { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: false, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: usize::MAX, @@ -338,12 +407,12 @@ mod tests { } #[test] - fn dev_mode_allows_dev_sender_header() { + fn dev_mode_rejects_dev_sender_header() { let layer = SecurityLayer::dev_mode(); let mut meta = MetadataMap::new(); meta.insert("x-macp-agent-id", "agent://dev-bot".parse().unwrap()); - let id = layer.authenticate_metadata(&meta).expect("should succeed"); - assert_eq!(id.sender, "agent://dev-bot"); + let err = layer.authenticate_metadata(&meta).unwrap_err(); + assert!(matches!(err, MacpError::Unauthenticated)); } #[test] @@ -480,11 +549,11 @@ mod tests { // --------------------------------------------------------------- #[test] - fn dev_sender_header_extracted_when_allowed() { + fn dev_sender_header_rejected_without_chain() { let layer = SecurityLayer { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: true, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: usize::MAX, @@ -499,11 +568,8 @@ mod tests { let mut meta = MetadataMap::new(); meta.insert("x-macp-agent-id", "agent://dev-agent".parse().unwrap()); - let id = layer.authenticate_metadata(&meta).expect("should succeed"); - assert_eq!(id.sender, "agent://dev-agent"); - assert!(id.allowed_modes.is_none()); - assert!(id.can_start_sessions); - assert!(id.max_open_sessions.is_none()); + let err = layer.authenticate_metadata(&meta).unwrap_err(); + assert!(matches!(err, MacpError::Unauthenticated)); } #[test] @@ -512,7 +578,7 @@ mod tests { let layer = SecurityLayer { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: false, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: usize::MAX, @@ -539,7 +605,7 @@ mod tests { let layer = SecurityLayer { identities: Arc::new(identities), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: true, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: usize::MAX, @@ -574,6 +640,7 @@ mod tests { can_start_sessions: true, max_open_sessions: None, can_manage_mode_registry: false, + is_observer: false, }; assert!(layer .authorize_mode(&id, "macp.mode.decision.v1", false) @@ -594,6 +661,7 @@ mod tests { can_start_sessions: true, max_open_sessions: None, can_manage_mode_registry: false, + is_observer: false, }; assert!(layer .authorize_mode(&id, "macp.mode.decision.v1", false) @@ -613,6 +681,7 @@ mod tests { can_start_sessions: false, max_open_sessions: None, can_manage_mode_registry: false, + is_observer: false, }; let err = layer .authorize_mode(&id, "macp.mode.decision.v1", true) @@ -629,6 +698,7 @@ mod tests { can_start_sessions: false, max_open_sessions: None, can_manage_mode_registry: false, + is_observer: false, }; // Regular messages (not session start) should succeed assert!(layer @@ -648,6 +718,7 @@ mod tests { can_start_sessions: false, max_open_sessions: None, can_manage_mode_registry: false, + is_observer: false, }; // Cannot start sessions (checked first) @@ -676,6 +747,7 @@ mod tests { allowed_modes: None, can_start_sessions: true, max_open_sessions: None, + is_observer: false, can_manage_mode_registry: false, }; let err = layer.authorize_mode_registry(&id).unwrap_err(); @@ -683,10 +755,12 @@ mod tests { } #[test] - fn dev_sender_header_can_manage_mode_registry() { - let layer = SecurityLayer::dev_mode(); + fn bearer_token_can_manage_mode_registry() { + let json = + r#"[{"token":"admin-tok","sender":"agent://admin","can_manage_mode_registry":true}]"#; + let layer = layer_with_tokens(json); let mut meta = MetadataMap::new(); - meta.insert("x-macp-agent-id", "agent://dev-admin".parse().unwrap()); + meta.insert("authorization", "Bearer admin-tok".parse().unwrap()); let id = layer.authenticate_metadata(&meta).unwrap(); assert!(layer.authorize_mode_registry(&id).is_ok()); } @@ -700,7 +774,7 @@ mod tests { let layer = SecurityLayer { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: false, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: 3, @@ -730,7 +804,7 @@ mod tests { let layer = SecurityLayer { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: false, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: usize::MAX, @@ -757,7 +831,7 @@ mod tests { let layer = SecurityLayer { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: false, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: 1, @@ -782,7 +856,7 @@ mod tests { let layer = SecurityLayer { identities: Arc::new(HashMap::new()), rate_bucket: Arc::new(RateBucket::default()), - allow_dev_sender_header: false, + auth_chain: None, max_payload_bytes: 1_048_576, session_start_rate: RateLimitConfig { limit: 1, diff --git a/src/server.rs b/src/server.rs index 72b1424..064a56e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,20 +1,22 @@ use macp_runtime::error::MacpError; use macp_runtime::pb::macp_runtime_service_server::MacpRuntimeService; use macp_runtime::pb::{ - Ack, CancelSessionRequest, CancelSessionResponse, CancellationCapability, Capabilities, - Envelope, GetManifestRequest, GetManifestResponse, GetPolicyRequest, GetPolicyResponse, - GetSessionRequest, GetSessionResponse, InitializeRequest, InitializeResponse, - ListExtModesRequest, ListExtModesResponse, ListModesRequest, ListModesResponse, - ListPoliciesRequest, ListPoliciesResponse, ListRootsRequest, ListRootsResponse, - MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ParticipantActivity, - PolicyDescriptor, PolicyRegistryCapability, ProgressCapability, PromoteModeRequest, - PromoteModeResponse, RegisterExtModeRequest, RegisterExtModeResponse, RegisterPolicyRequest, - RegisterPolicyResponse, RootsCapability, RuntimeInfo, SendRequest, SendResponse, + session_lifecycle_event, Ack, CancelSessionRequest, CancelSessionResponse, + CancellationCapability, Capabilities, Envelope, GetManifestRequest, GetManifestResponse, + GetPolicyRequest, GetPolicyResponse, GetSessionRequest, GetSessionResponse, InitializeRequest, + InitializeResponse, ListExtModesRequest, ListExtModesResponse, ListModesRequest, + ListModesResponse, ListPoliciesRequest, ListPoliciesResponse, ListRootsRequest, + ListRootsResponse, ListSessionsRequest, ListSessionsResponse, MacpError as PbMacpError, + ManifestCapability, ModeRegistryCapability, ParticipantActivity, PolicyDescriptor, + PolicyRegistryCapability, ProgressCapability, PromoteModeRequest, PromoteModeResponse, + RegisterExtModeRequest, RegisterExtModeResponse, RegisterPolicyRequest, RegisterPolicyResponse, + RootsCapability, RuntimeInfo, SendRequest, SendResponse, SessionLifecycleEvent, SessionMetadata, SessionState as PbSessionState, SessionsCapability, StreamSessionRequest, StreamSessionResponse, UnregisterExtModeRequest, UnregisterExtModeResponse, UnregisterPolicyRequest, UnregisterPolicyResponse, WatchModeRegistryRequest, WatchModeRegistryResponse, WatchPoliciesRequest, WatchPoliciesResponse, WatchRootsRequest, - WatchRootsResponse, WatchSignalsRequest, WatchSignalsResponse, + WatchRootsResponse, WatchSessionsRequest, WatchSessionsResponse, WatchSignalsRequest, + WatchSignalsResponse, }; use macp_runtime::runtime::Runtime; use macp_runtime::security::{AuthIdentity, SecurityLayer}; @@ -84,6 +86,37 @@ impl MacpServer { } } + fn session_to_metadata(session: &macp_runtime::session::Session) -> SessionMetadata { + let participant_activity = session + .participant_message_counts + .iter() + .map(|(pid, count)| ParticipantActivity { + participant_id: pid.clone(), + last_message_at_unix_ms: session + .participant_last_seen + .get(pid) + .copied() + .unwrap_or(0), + message_count: *count, + }) + .collect(); + SessionMetadata { + session_id: session.session_id.clone(), + mode: session.mode.clone(), + state: Self::session_state_to_pb(&session.state), + started_at_unix_ms: session.started_at_unix_ms, + expires_at_unix_ms: session.ttl_expiry, + mode_version: session.mode_version.clone(), + configuration_version: session.configuration_version.clone(), + policy_version: session.policy_version.clone(), + participants: session.participants.clone(), + participant_activity, + initiator: session.initiator_sender.clone(), + context_id: session.context_id.clone(), + extension_keys: session.extensions.keys().cloned().collect(), + } + } + fn make_error_ack(e: &MacpError, env: &Envelope) -> Ack { let details = Self::error_details_bytes(e); Ack { @@ -160,7 +193,8 @@ impl MacpServer { .get_session_checked(session_id) .await .ok_or_else(|| Status::not_found(format!("Session '{}' not found", session_id)))?; - let allowed = session.initiator_sender == identity.sender + let allowed = identity.is_observer + || session.initiator_sender == identity.sender || session.participants.iter().any(|p| p == &identity.sender); if !allowed { return Err(Status::permission_denied( @@ -525,7 +559,7 @@ impl MacpRuntimeService for MacpServer { website_url: String::new(), }), capabilities: Some(Capabilities { - sessions: Some(SessionsCapability { stream: true }), + sessions: Some(SessionsCapability { stream: true, list_sessions: true, watch_sessions: true }), cancellation: Some(CancellationCapability { cancel_session: true, }), @@ -605,34 +639,8 @@ impl MacpRuntimeService for MacpServer { .await .ok_or_else(|| Status::not_found(format!("Session '{}' not found", session_id)))?; - let participant_activity = session - .participant_message_counts - .iter() - .map(|(pid, count)| ParticipantActivity { - participant_id: pid.clone(), - last_message_at_unix_ms: session - .participant_last_seen - .get(pid) - .copied() - .unwrap_or(0), - message_count: *count, - }) - .collect(); - Ok(Response::new(GetSessionResponse { - metadata: Some(SessionMetadata { - session_id: session.session_id.clone(), - mode: session.mode.clone(), - state: Self::session_state_to_pb(&session.state), - started_at_unix_ms: session.started_at_unix_ms, - expires_at_unix_ms: session.ttl_expiry, - mode_version: session.mode_version.clone(), - configuration_version: session.configuration_version.clone(), - policy_version: session.policy_version.clone(), - participants: session.participants.clone(), - participant_activity, - initiator: session.initiator_sender.clone(), - }), + metadata: Some(Self::session_to_metadata(&session)), })) } @@ -812,6 +820,10 @@ impl MacpRuntimeService for MacpServer { Box> + Send>, >; + type WatchSessionsStream = std::pin::Pin< + Box> + Send>, + >; + async fn watch_signals( &self, _request: Request, @@ -827,6 +839,68 @@ impl MacpRuntimeService for MacpServer { Ok(Response::new(Box::pin(stream))) } + // Session lifecycle observation RPCs + + async fn list_sessions( + &self, + request: Request, + ) -> Result, Status> { + let _identity = self + .security + .authenticate_metadata(request.metadata()) + .map_err(Self::status_from_error)?; + let sessions = self.runtime.registry.get_all_sessions().await; + let metadata: Vec = + sessions.iter().map(Self::session_to_metadata).collect(); + Ok(Response::new(ListSessionsResponse { sessions: metadata })) + } + + async fn watch_sessions( + &self, + request: Request, + ) -> Result, Status> { + let _identity = self + .security + .authenticate_metadata(request.metadata()) + .map_err(Self::status_from_error)?; + let mut rx = self.runtime.subscribe_session_lifecycle(); + let runtime = Arc::clone(&self.runtime); + let stream = async_stream::try_stream! { + // Initial sync: emit all current sessions as CREATED events + let sessions = runtime.registry.get_all_sessions().await; + for session in &sessions { + yield WatchSessionsResponse { + event: Some(SessionLifecycleEvent { + event_type: session_lifecycle_event::EventType::Created.into(), + session: Some(Self::session_to_metadata(session)), + observed_at_unix_ms: session.started_at_unix_ms, + }), + }; + } + // Stream lifecycle transitions + while let Ok(event) = rx.recv().await { + let (event_type, sid) = match &event { + macp_runtime::runtime::SessionLifecycleEvent::Created { session_id } => + (session_lifecycle_event::EventType::Created, session_id.clone()), + macp_runtime::runtime::SessionLifecycleEvent::Resolved { session_id } => + (session_lifecycle_event::EventType::Resolved, session_id.clone()), + macp_runtime::runtime::SessionLifecycleEvent::Expired { session_id } => + (session_lifecycle_event::EventType::Expired, session_id.clone()), + }; + let session_meta = runtime.registry.get_session(&sid).await + .map(|s| Self::session_to_metadata(&s)); + yield WatchSessionsResponse { + event: Some(SessionLifecycleEvent { + event_type: event_type.into(), + session: session_meta, + observed_at_unix_ms: chrono::Utc::now().timestamp_millis(), + }), + }; + } + }; + Ok(Response::new(Box::pin(stream))) + } + // Extension mode lifecycle RPCs async fn list_ext_modes( @@ -1061,7 +1135,7 @@ impl MacpServer { let rules: serde_json::Value = if descriptor.rules.is_empty() { serde_json::json!({}) } else { - serde_json::from_slice(&descriptor.rules).unwrap_or_else(|_| serde_json::json!({})) + serde_json::from_str(&descriptor.rules).unwrap_or_else(|_| serde_json::json!({})) }; macp_runtime::policy::PolicyDefinition { policy_id: descriptor.policy_id.clone(), @@ -1079,7 +1153,7 @@ impl MacpServer { policy_id: definition.policy_id.clone(), mode: definition.mode.clone(), description: definition.description.clone(), - rules: serde_json::to_vec(&definition.rules).unwrap_or_default(), + rules: serde_json::to_string(&definition.rules).unwrap_or_default(), schema_version: definition.schema_version, registered_at_unix_ms: 0, } @@ -1114,7 +1188,7 @@ mod tests { envelope: Some(env), }); req.metadata_mut() - .insert("x-macp-agent-id", sender.parse().unwrap()); + .insert("authorization", format!("Bearer {sender}").parse().unwrap()); req } @@ -1131,7 +1205,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 1000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() @@ -1206,8 +1281,10 @@ mod tests { assert!(ack.ok); let mut req = Request::new(GetSessionRequest { session_id: sid }); - req.metadata_mut() - .insert("x-macp-agent-id", "agent://outsider".parse().unwrap()); + req.metadata_mut().insert( + "authorization", + format!("Bearer {}", "agent://outsider").parse().unwrap(), + ); let err = server.get_session(req).await.unwrap_err(); assert_eq!(err.code(), tonic::Code::PermissionDenied); } @@ -1241,6 +1318,7 @@ mod tests { can_start_sessions: true, max_open_sessions: None, can_manage_mode_registry: false, + is_observer: false, } } @@ -1403,8 +1481,12 @@ mod tests { let mut req = Request::new(GetSessionRequest { session_id: sid.clone(), }); - req.metadata_mut() - .insert("x-macp-agent-id", "agent://orchestrator".parse().unwrap()); + req.metadata_mut().insert( + "authorization", + format!("Bearer {}", "agent://orchestrator") + .parse() + .unwrap(), + ); let resp = server.get_session(req).await.unwrap(); let meta = resp.into_inner().metadata.unwrap(); assert_eq!(meta.session_id, sid); @@ -1438,8 +1520,12 @@ mod tests { session_id: sid, reason: "no longer needed".into(), }); - req.metadata_mut() - .insert("x-macp-agent-id", "agent://orchestrator".parse().unwrap()); + req.metadata_mut().insert( + "authorization", + format!("Bearer {}", "agent://orchestrator") + .parse() + .unwrap(), + ); let resp = server.cancel_session(req).await.unwrap(); let ack = resp.into_inner().ack.unwrap(); assert!(ack.ok); @@ -1471,8 +1557,10 @@ mod tests { session_id: sid, reason: "I want to cancel".into(), }); - req.metadata_mut() - .insert("x-macp-agent-id", "agent://fraud".parse().unwrap()); + req.metadata_mut().insert( + "authorization", + format!("Bearer {}", "agent://fraud").parse().unwrap(), + ); let err = server.cancel_session(req).await.unwrap_err(); assert_eq!(err.code(), tonic::Code::PermissionDenied); } @@ -1484,8 +1572,12 @@ mod tests { session_id: "nonexistent".into(), reason: "test".into(), }); - req.metadata_mut() - .insert("x-macp-agent-id", "agent://orchestrator".parse().unwrap()); + req.metadata_mut().insert( + "authorization", + format!("Bearer {}", "agent://orchestrator") + .parse() + .unwrap(), + ); let err = server.cancel_session(req).await.unwrap_err(); assert_eq!(err.code(), tonic::Code::NotFound); } diff --git a/src/session.rs b/src/session.rs index a2d8858..a09a12e 100644 --- a/src/session.rs +++ b/src/session.rs @@ -30,7 +30,8 @@ pub struct Session { pub mode_version: String, pub configuration_version: String, pub policy_version: String, - pub context: Vec, + pub context_id: String, + pub extensions: HashMap>, pub roots: Vec, pub initiator_sender: String, pub participant_message_counts: HashMap, @@ -189,7 +190,8 @@ mod tests { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], }; payload.encode_to_vec() diff --git a/src/storage/file.rs b/src/storage/file.rs index 65394f5..8960834 100644 --- a/src/storage/file.rs +++ b/src/storage/file.rs @@ -180,7 +180,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "pol-1".into(), - context: vec![9], + context_id: "test-ctx".to_string(), + extensions: std::collections::HashMap::new(), roots: vec![crate::pb::Root { uri: "root://1".into(), name: "r1".into(), diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 939ea84..0c812f2 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -64,7 +64,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "pol-1".into(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "alice".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/storage/migration.rs b/src/storage/migration.rs index 1af895b..cfdf467 100644 --- a/src/storage/migration.rs +++ b/src/storage/migration.rs @@ -170,7 +170,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "pol-1".into(), - context: vec![9], + context_id: "test-ctx".to_string(), + extensions: std::collections::HashMap::new(), roots: vec![crate::pb::Root { uri: "root://1".into(), name: "r1".into(), diff --git a/src/storage/recovery.rs b/src/storage/recovery.rs index 1946e5e..02590fe 100644 --- a/src/storage/recovery.rs +++ b/src/storage/recovery.rs @@ -70,7 +70,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "pol-1".into(), - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "alice".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/storage/redis_backend.rs b/src/storage/redis_backend.rs index e63cefc..0b6fd6f 100644 --- a/src/storage/redis_backend.rs +++ b/src/storage/redis_backend.rs @@ -167,7 +167,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "pol-1".into(), - context: vec![9], + context_id: "test-ctx".to_string(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "alice".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 65096b1..aa1c083 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -255,7 +255,8 @@ mod tests { mode_version: "1.0.0".into(), configuration_version: "cfg-1".into(), policy_version: "pol-1".into(), - context: vec![9], + context_id: "test-ctx".to_string(), + extensions: std::collections::HashMap::new(), roots: vec![], initiator_sender: "alice".into(), participant_message_counts: std::collections::HashMap::new(), diff --git a/tests/concurrent_messages.rs b/tests/concurrent_messages.rs index 01fbe8d..2a01c7a 100644 --- a/tests/concurrent_messages.rs +++ b/tests/concurrent_messages.rs @@ -25,7 +25,8 @@ fn session_start(participants: Vec) -> Vec { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() diff --git a/tests/conformance_loader.rs b/tests/conformance_loader.rs index 4d1bd6a..effd03f 100644 --- a/tests/conformance_loader.rs +++ b/tests/conformance_loader.rs @@ -355,7 +355,8 @@ async fn run_conformance_fixture(path: &Path) { configuration_version: fixture.configuration_version.clone(), policy_version: fixture.policy_version.clone(), ttl_ms: fixture.ttl_ms, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); diff --git a/tests/file_backend_integration.rs b/tests/file_backend_integration.rs index 8560127..be31a4d 100644 --- a/tests/file_backend_integration.rs +++ b/tests/file_backend_integration.rs @@ -21,7 +21,8 @@ fn session_start(participants: Vec) -> Vec { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() diff --git a/tests/integration_mode_lifecycle.rs b/tests/integration_mode_lifecycle.rs index ac0f2bc..b0b9ec6 100644 --- a/tests/integration_mode_lifecycle.rs +++ b/tests/integration_mode_lifecycle.rs @@ -27,7 +27,8 @@ fn session_start(participants: Vec) -> Vec { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() diff --git a/tests/replay_round_trip.rs b/tests/replay_round_trip.rs index 4ef2cfa..7028cf0 100644 --- a/tests/replay_round_trip.rs +++ b/tests/replay_round_trip.rs @@ -17,7 +17,8 @@ fn start_payload(participants: Vec<&str>) -> Vec { configuration_version: "cfg-1".into(), policy_version: "policy-1".into(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() diff --git a/tests/stream_integration.rs b/tests/stream_integration.rs index 4fc58b9..e3fcf7b 100644 --- a/tests/stream_integration.rs +++ b/tests/stream_integration.rs @@ -25,7 +25,8 @@ fn session_start(participants: Vec) -> Vec { configuration_version: "cfg-1".into(), policy_version: String::new(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() From d1733e2e45f3ce9d9b970dcfed5f590de501d501 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Sat, 18 Apr 2026 14:24:05 -0700 Subject: [PATCH 2/4] Fix Test cases, Fmt and Lint --- README.md | 27 ++- docs/API.md | 9 +- docs/deployment.md | 6 +- docs/examples.md | 5 +- docs/getting-started.md | 19 +- docs/sdk-guide.md | 4 +- integration_tests/Cargo.lock | 163 +++++++++++++++++- integration_tests/src/config.rs | 2 +- integration_tests/src/helpers.rs | 9 +- integration_tests/src/server_manager.rs | 1 - .../tier1_protocol/test_mode_registry.rs | 6 +- .../tier1_protocol/test_policy_registry.rs | 14 +- .../tier1_protocol/test_rfc_cross_cutting.rs | 6 +- .../tier1_protocol/test_validation_gaps.rs | 9 +- .../test_e2e_decision_with_signals.rs | 6 +- 15 files changed, 240 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 1af526d..7ac56cc 100644 --- a/README.md +++ b/README.md @@ -84,14 +84,16 @@ For all standards-track modes and built-in extensions, `SessionStartPayload` mus In production, requests should be authenticated with a bearer token. The runtime derives `Envelope.sender` from the authenticated identity and rejects spoofed sender values. -For local development, you may opt into insecure/dev mode with: +For local development, opt into insecure transport with: ```bash MACP_ALLOW_INSECURE=1 -MACP_ALLOW_DEV_SENDER_HEADER=1 ``` -When dev header mode is enabled, clients can set `x-macp-agent-id` metadata instead of bearer tokens. +When no auth resolvers are configured (no `MACP_AUTH_TOKENS_*` and no +`MACP_AUTH_ISSUER`), the runtime falls back to dev-mode auth: any +`Authorization: Bearer ` header authenticates the caller as +sender ``. Use only for local development. ### Persistence @@ -115,9 +117,18 @@ Unless `MACP_MEMORY_ONLY=1` is set, the runtime persists session and log snapsho | Variable | Meaning | Default | |---|---|---| -| `MACP_AUTH_TOKENS_JSON` | inline auth config JSON | unset | -| `MACP_AUTH_TOKENS_FILE` | path to auth config JSON | unset | -| `MACP_ALLOW_DEV_SENDER_HEADER` | allow `x-macp-agent-id` for local dev | unset | +| `MACP_AUTH_TOKENS_JSON` | inline static bearer token config JSON | unset | +| `MACP_AUTH_TOKENS_FILE` | path to static bearer token config JSON | unset | +| `MACP_AUTH_ISSUER` | JWT resolver expected `iss` claim (enables JWT auth) | unset | +| `MACP_AUTH_AUDIENCE` | JWT resolver expected `aud` claim | `macp-runtime` | +| `MACP_AUTH_JWKS_JSON` | inline JWKS document used to validate JWTs | unset | +| `MACP_AUTH_JWKS_URL` | JWKS endpoint URL (fetched + cached) | unset | +| `MACP_AUTH_JWKS_TTL_SECS` | JWKS cache TTL when fetched from URL | `300` | + +Auth is layered as a resolver chain: configured JWT first, then static +bearer, with a dev-mode fallback only when both are absent. JWT tokens +supply MACP scopes via a `macp_scopes` claim matching the static token +schema. Token JSON may be either a raw list or an object with a `tokens` array. Example: @@ -170,10 +181,12 @@ cargo run ```bash export MACP_ALLOW_INSECURE=1 -export MACP_ALLOW_DEV_SENDER_HEADER=1 cargo run ``` +With no auth tokens configured, clients authenticate by sending their +sender identity as a bearer token (e.g. `Authorization: Bearer agent://alice`). + ### Running the example clients The example clients in `src/bin` assume the local development startup shown above. diff --git a/docs/API.md b/docs/API.md index b856336..9323bb6 100644 --- a/docs/API.md +++ b/docs/API.md @@ -227,11 +227,12 @@ rpc WatchPolicies(WatchPoliciesRequest) returns (stream WatchPoliciesResponse) ## Authentication -The runtime checks credentials in this order: +The runtime applies a resolver chain in this order: -1. **Bearer token**: `Authorization: Bearer ` or `x-macp-token: ` header. The token is mapped to an `AuthIdentity` via the configured token file. -2. **Dev mode**: `x-macp-agent-id: ` header, only when `MACP_ALLOW_DEV_SENDER_HEADER=1`. Grants all capabilities. -3. **Reject**: Returns `UNAUTHENTICATED`. +1. **JWT bearer** (when `MACP_AUTH_ISSUER` is set): `Authorization: Bearer `. The JWT's `sub` claim becomes the sender; `macp_scopes` carries capability flags (`allowed_modes`, `can_start_sessions`, `max_open_sessions`, `can_manage_mode_registry`, `is_observer`). +2. **Static bearer** (when `MACP_AUTH_TOKENS_*` is set): `Authorization: Bearer ` or `x-macp-token: ` header. The opaque token is mapped to an `AuthIdentity` via the configured token file. +3. **Dev-mode fallback** (when neither JWT nor static bearer is configured): any `Authorization: Bearer ` header authenticates the caller as sender `` with all capabilities. Intended only for local development. +4. **Reject**: Returns `UNAUTHENTICATED`. See the [Getting Started guide](getting-started.md) for token configuration examples. diff --git a/docs/deployment.md b/docs/deployment.md index 4c0adc3..b1a1449 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -29,7 +29,11 @@ Before exposing the runtime to production traffic, ensure these four items are c | `MACP_REDIS_URL` | `redis://127.0.0.1:6379` | Redis connection URL | | `MACP_MEMORY_ONLY` | off | Set to `1` to disable persistence entirely | | `MACP_ALLOW_INSECURE` | off | Allow plaintext connections (development only) | -| `MACP_ALLOW_DEV_SENDER_HEADER` | off | Trust `x-macp-agent-id` header (development only) | +| `MACP_AUTH_ISSUER` | -- | JWT resolver expected `iss` claim (enables JWT auth) | +| `MACP_AUTH_AUDIENCE` | `macp-runtime` | JWT resolver expected `aud` claim | +| `MACP_AUTH_JWKS_JSON` | -- | Inline JWKS document (JSON) for JWT validation | +| `MACP_AUTH_JWKS_URL` | -- | JWKS endpoint URL (fetched + cached) | +| `MACP_AUTH_JWKS_TTL_SECS` | `300` | JWKS cache TTL when fetched from URL | | `MACP_MAX_PAYLOAD_BYTES` | `1048576` | Maximum envelope payload size in bytes | | `MACP_SESSION_START_LIMIT_PER_MINUTE` | `60` | Per-sender session creation rate limit | | `MACP_MESSAGE_LIMIT_PER_MINUTE` | `600` | Per-sender message rate limit | diff --git a/docs/examples.md b/docs/examples.md index 545ebc1..a53d2b4 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -8,11 +8,10 @@ All examples target `macp-runtime v0.4.0` and use the development security short ```bash export MACP_ALLOW_INSECURE=1 -export MACP_ALLOW_DEV_SENDER_HEADER=1 cargo run ``` -The example binaries attach `x-macp-agent-id` metadata so the runtime can derive the authenticated sender. Every example creates a session with the required fields: `participants`, `mode_version` (`"1.0.0"`), `configuration_version` (`"config.default"`), and a positive `ttl_ms`. +The example binaries attach `Authorization: Bearer ` metadata so the runtime derives the authenticated sender from the bearer token. Every example creates a session with the required fields: `participants`, `mode_version` (`"1.0.0"`), `configuration_version` (`"config.default"`), and a positive `ttl_ms`. ## Decision Mode @@ -96,7 +95,7 @@ This client deliberately exercises failure paths: invalid protocol versions, emp ## Troubleshooting -**`UNAUTHENTICATED`**: Start the runtime with `MACP_ALLOW_DEV_SENDER_HEADER=1` and ensure the client sets the `x-macp-agent-id` header. +**`UNAUTHENTICATED`**: Start the runtime without `MACP_AUTH_*` configured (so dev-mode auth is active) and ensure the client sets `Authorization: Bearer `. With auth resolvers configured, send a matching static bearer or JWT instead. **`INVALID_ENVELOPE` on SessionStart**: Verify that the mode name is canonical, the payload is not empty, and all four required fields (`mode_version`, `configuration_version`, `ttl_ms > 0`, `participants`) are present. diff --git a/docs/getting-started.md b/docs/getting-started.md index a669fe1..6fc1d52 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -32,15 +32,14 @@ cargo build ### Starting a development server -For local development, two environment variables disable TLS and enable a header-based identity shortcut. This lets you send requests without configuring tokens or certificates: +For local development, disable TLS and skip configuring tokens. With no auth resolvers configured, the runtime falls back to dev-mode auth: ```bash export MACP_ALLOW_INSECURE=1 -export MACP_ALLOW_DEV_SENDER_HEADER=1 cargo run ``` -The server listens on `127.0.0.1:50051` and trusts the `x-macp-agent-id` gRPC metadata header as the authenticated sender identity. +The server listens on `127.0.0.1:50051` and treats any `Authorization: Bearer ` header as authenticating the caller with sender identity ``. ### Starting a production server @@ -157,13 +156,13 @@ The session is now terminal. Any subsequent messages targeting it are rejected w ### Development mode -In development mode, clients set the `x-macp-agent-id` gRPC metadata header to declare their identity: +In development mode (no auth resolvers configured), clients send their sender identity as a bearer token: ``` -metadata: { "x-macp-agent-id": "agent://my-agent" } +metadata: { "authorization": "Bearer agent://my-agent" } ``` -The runtime trusts this header directly. This is only available when `MACP_ALLOW_DEV_SENDER_HEADER=1` is set. +The runtime accepts any bearer value as the sender identity. This fallback is only active while neither `MACP_AUTH_TOKENS_*` nor `MACP_AUTH_ISSUER` is set. ### Production mode @@ -191,13 +190,17 @@ Create a `tokens.json` file that maps bearer tokens to agent identities and capa Setting `allowed_modes` to an empty array grants access to all modes. The runtime derives the sender identity from the token, so agents cannot spoof their identity. Clients authenticate by sending `Authorization: Bearer ` in the gRPC metadata. +### JWT mode + +The runtime also accepts JWT bearer tokens when `MACP_AUTH_ISSUER` is set. Configure a JWKS source (`MACP_AUTH_JWKS_JSON` inline, or `MACP_AUTH_JWKS_URL` fetched + cached) and optionally `MACP_AUTH_AUDIENCE` (default `macp-runtime`) and `MACP_AUTH_JWKS_TTL_SECS` (default 300). The JWT's `sub` claim becomes the sender; an optional `macp_scopes` claim carries the same capability fields as the static token config (`allowed_modes`, `can_start_sessions`, `max_open_sessions`, `can_manage_mode_registry`, `is_observer`). + ## Running the example clients The repository includes example clients in `src/bin` that demonstrate each mode. Start the development server in one terminal, then run any example in another: ```bash # Terminal 1: start the server -export MACP_ALLOW_INSECURE=1 && export MACP_ALLOW_DEV_SENDER_HEADER=1 && cargo run +export MACP_ALLOW_INSECURE=1 && cargo run # Terminal 2: run examples cargo run --bin client # Decision mode @@ -213,7 +216,7 @@ cargo run --bin fuzz_client # Error path testing | Error | Cause | Fix | |-------|-------|-----| -| `UNAUTHENTICATED` | No valid credential provided | Set `MACP_ALLOW_DEV_SENDER_HEADER=1` and send the `x-macp-agent-id` header | +| `UNAUTHENTICATED` | No valid credential provided | In dev mode send `Authorization: Bearer `; in prod send a configured static bearer or a valid JWT | | `INVALID_ENVELOPE` | Missing required SessionStart fields | Ensure `participants`, `mode_version`, `configuration_version`, and `ttl_ms > 0` are all present | | `SESSION_NOT_OPEN` | Session already resolved or expired | Use `GetSession` to check state; start a new session | | `INVALID_SESSION_ID` | Session ID format not accepted | Use UUID v4/v7 or base64url (22+ characters) | diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index 1a3995f..87db607 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -137,8 +137,8 @@ Unknown fields in protobuf messages are silently ignored, so SDKs built for prot ### Against a local runtime ```bash -MACP_ALLOW_INSECURE=1 MACP_ALLOW_DEV_SENDER_HEADER=1 cargo run -# SDK connects to localhost:50051 using x-macp-agent-id header +MACP_ALLOW_INSECURE=1 cargo run +# SDK connects to localhost:50051 sending Authorization: Bearer ``` ### Test checklist diff --git a/integration_tests/Cargo.lock b/integration_tests/Cargo.lock index 942f61a..31bfdaf 100644 --- a/integration_tests/Cargo.lock +++ b/integration_tests/Cargo.lock @@ -294,6 +294,15 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -967,6 +976,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1041,7 +1065,6 @@ dependencies = [ [[package]] name = "macp-proto" version = "0.1.0" -source = "git+https://github.com/multiagentcoordinationprotocol/multiagentcoordinationprotocol.git#07d80fc1052cc1c96eaf069a799889c1d204dc95" [[package]] name = "macp-runtime" @@ -1051,9 +1074,11 @@ dependencies = [ "async-trait", "chrono", "futures-core", + "jsonwebtoken", "macp-proto", "prost", "prost-types", + "reqwest 0.12.28", "serde", "serde_json", "thiserror 1.0.69", @@ -1156,6 +1181,31 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-conv" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1209,6 +1259,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64", + "serde_core", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1261,6 +1321,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1556,6 +1622,38 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "reqwest" version = "0.13.2" @@ -1621,7 +1719,7 @@ dependencies = [ "nanoid", "ordered-float", "pin-project-lite", - "reqwest", + "reqwest 0.13.2", "schemars", "serde", "serde_json", @@ -1749,6 +1847,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "same-file" version = "1.0.6" @@ -1881,6 +1985,18 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.6" @@ -1917,6 +2033,18 @@ dependencies = [ "libc", ] +[[package]] +name = "simple_asn1" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.18", + "time", +] + [[package]] name = "slab" version = "0.4.12" @@ -2065,6 +2193,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde_core", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + +[[package]] +name = "time-macros" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.3" diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index 4ef7107..6ae39cd 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -32,7 +32,7 @@ impl TestConfig { self.endpoint.is_none() } - /// Whether to use dev-mode headers (x-macp-agent-id) instead of bearer tokens. + /// Whether to use dev-mode auth (Authorization: Bearer ) instead of configured bearer tokens. pub fn use_dev_headers(&self) -> bool { self.auth_token.is_none() } diff --git a/integration_tests/src/helpers.rs b/integration_tests/src/helpers.rs index 260fd4e..4bda9fa 100644 --- a/integration_tests/src/helpers.rs +++ b/integration_tests/src/helpers.rs @@ -30,8 +30,10 @@ pub fn new_message_id() -> String { fn with_sender(sender: &str, inner: T) -> Request { let mut request = Request::new(inner); request.metadata_mut().insert( - "x-macp-agent-id", - sender.parse().expect("valid sender header"), + "authorization", + format!("Bearer {sender}") + .parse() + .expect("valid auth header"), ); request } @@ -64,7 +66,8 @@ pub fn session_start_payload(intent: &str, participants: &[&str], ttl_ms: i64) - configuration_version: CONFIG_VERSION.into(), policy_version: POLICY_VERSION.into(), ttl_ms, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec() diff --git a/integration_tests/src/server_manager.rs b/integration_tests/src/server_manager.rs index 55d9b7b..54f4a65 100644 --- a/integration_tests/src/server_manager.rs +++ b/integration_tests/src/server_manager.rs @@ -23,7 +23,6 @@ impl ServerManager { let child = Command::new(binary_path) .env("MACP_ALLOW_INSECURE", "1") - .env("MACP_ALLOW_DEV_SENDER_HEADER", "1") .env("MACP_MEMORY_ONLY", "1") .env("MACP_BIND_ADDR", &bind_addr) .env("RUST_LOG", "warn") diff --git a/integration_tests/tests/tier1_protocol/test_mode_registry.rs b/integration_tests/tests/tier1_protocol/test_mode_registry.rs index d800029..adc90ab 100644 --- a/integration_tests/tests/tier1_protocol/test_mode_registry.rs +++ b/integration_tests/tests/tier1_protocol/test_mode_registry.rs @@ -8,8 +8,10 @@ use tonic::Request; fn with_sender(sender: &str, inner: T) -> Request { let mut request = Request::new(inner); request.metadata_mut().insert( - "x-macp-agent-id", - sender.parse().expect("valid sender header"), + "authorization", + format!("Bearer {sender}") + .parse() + .expect("valid auth header"), ); request } diff --git a/integration_tests/tests/tier1_protocol/test_policy_registry.rs b/integration_tests/tests/tier1_protocol/test_policy_registry.rs index d70ddf9..4b113c5 100644 --- a/integration_tests/tests/tier1_protocol/test_policy_registry.rs +++ b/integration_tests/tests/tier1_protocol/test_policy_registry.rs @@ -10,8 +10,10 @@ use tonic::Request; fn with_sender(sender: &str, inner: T) -> Request { let mut request = Request::new(inner); request.metadata_mut().insert( - "x-macp-agent-id", - sender.parse().expect("valid sender header"), + "authorization", + format!("Bearer {sender}") + .parse() + .expect("valid auth header"), ); request } @@ -21,7 +23,7 @@ fn test_descriptor(policy_id: &str, mode: &str, rules_json: serde_json::Value) - policy_id: policy_id.into(), mode: mode.into(), description: format!("test policy {}", policy_id), - rules: serde_json::to_vec(&rules_json).unwrap(), + rules: serde_json::to_string(&rules_json).unwrap(), schema_version: 1, registered_at_unix_ms: 0, } @@ -282,7 +284,8 @@ async fn unknown_policy_version_rejects_session_start() { configuration_version: CONFIG_VERSION.into(), policy_version: "policy.nonexistent.v999".into(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); @@ -345,7 +348,8 @@ async fn policy_enforcement_blocks_commitment_in_decision_mode() { configuration_version: CONFIG_VERSION.into(), policy_version: policy_id.clone(), ttl_ms: 60_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); diff --git a/integration_tests/tests/tier1_protocol/test_rfc_cross_cutting.rs b/integration_tests/tests/tier1_protocol/test_rfc_cross_cutting.rs index 6a79cf7..791a1d4 100644 --- a/integration_tests/tests/tier1_protocol/test_rfc_cross_cutting.rs +++ b/integration_tests/tests/tier1_protocol/test_rfc_cross_cutting.rs @@ -14,8 +14,10 @@ use tonic::Request; fn with_sender(sender: &str, inner: T) -> Request { let mut request = Request::new(inner); request.metadata_mut().insert( - "x-macp-agent-id", - sender.parse().expect("valid sender header"), + "authorization", + format!("Bearer {sender}") + .parse() + .expect("valid auth header"), ); request } diff --git a/integration_tests/tests/tier1_protocol/test_validation_gaps.rs b/integration_tests/tests/tier1_protocol/test_validation_gaps.rs index 6817215..aed635a 100644 --- a/integration_tests/tests/tier1_protocol/test_validation_gaps.rs +++ b/integration_tests/tests/tier1_protocol/test_validation_gaps.rs @@ -11,8 +11,10 @@ use tonic::Request; fn with_sender(sender: &str, inner: T) -> Request { let mut request = Request::new(inner); request.metadata_mut().insert( - "x-macp-agent-id", - sender.parse().expect("valid sender header"), + "authorization", + format!("Bearer {sender}") + .parse() + .expect("valid auth header"), ); request } @@ -214,7 +216,8 @@ async fn session_start_too_many_participants_rejected() { configuration_version: CONFIG_VERSION.into(), policy_version: POLICY_VERSION.into(), ttl_ms: 30_000, - context: vec![], + context_id: String::new(), + extensions: std::collections::HashMap::new(), roots: vec![], } .encode_to_vec(); diff --git a/integration_tests/tests/tier3_e2e/test_e2e_decision_with_signals.rs b/integration_tests/tests/tier3_e2e/test_e2e_decision_with_signals.rs index eb2f148..787ad88 100644 --- a/integration_tests/tests/tier3_e2e/test_e2e_decision_with_signals.rs +++ b/integration_tests/tests/tier3_e2e/test_e2e_decision_with_signals.rs @@ -11,8 +11,10 @@ use tonic::Request; fn with_sender(sender: &str, inner: T) -> Request { let mut request = Request::new(inner); request.metadata_mut().insert( - "x-macp-agent-id", - sender.parse().expect("valid sender header"), + "authorization", + format!("Bearer {sender}") + .parse() + .expect("valid auth header"), ); request } From 4f19cd85d402f6398d2f4f5408e1e946bbd5cf7d Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Sat, 18 Apr 2026 15:09:59 -0700 Subject: [PATCH 3/4] Add Auth Integration Tests --- Cargo.lock | 5 +- Cargo.toml | 5 +- integration_tests/Cargo.lock | 6 +- integration_tests/Cargo.toml | 4 + integration_tests/src/server_manager.rs | 21 +- integration_tests/tests/tier1_jwt.rs | 364 ++++++++++++++++++++++++ src/auth/resolvers/jwt_bearer.rs | 235 ++++++++++++++- 7 files changed, 628 insertions(+), 12 deletions(-) create mode 100644 integration_tests/tests/tier1_jwt.rs diff --git a/Cargo.lock b/Cargo.lock index 2cafbad..f0c14f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1082,7 +1082,9 @@ dependencies = [ [[package]] name = "macp-proto" -version = "0.1.0" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca7b5c6a2f0de7adb7c6159b00b11f8385eb216b0e68c74f2ef7a95fbd980ea" [[package]] name = "macp-runtime" @@ -1090,6 +1092,7 @@ version = "0.4.0" dependencies = [ "async-stream", "async-trait", + "base64 0.22.1", "chrono", "futures-core", "jsonwebtoken", diff --git a/Cargo.toml b/Cargo.toml index 5127e7e..d3538d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,12 +36,11 @@ opentelemetry-otlp = { version = "0.15", features = ["tonic"], optional = true } tracing-opentelemetry = { version = "0.23", optional = true } rocksdb = { version = "0.22", optional = true } redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true } -# Proto definitions — local path for WatchSessions development. -# Switch back to crates.io after publishing: macp-proto = "0.2.0" -macp-proto = { path = "../multiagentcoordinationprotocol/packages/proto-rust" } +macp-proto = "0.1.1" [dev-dependencies] tempfile = "3" +base64 = "0.22" [build-dependencies] tonic-prost-build = "0.14" diff --git a/integration_tests/Cargo.lock b/integration_tests/Cargo.lock index 31bfdaf..ff2fb7c 100644 --- a/integration_tests/Cargo.lock +++ b/integration_tests/Cargo.lock @@ -1048,7 +1048,9 @@ version = "0.0.0" dependencies = [ "anyhow", "async-trait", + "base64", "chrono", + "jsonwebtoken", "macp-runtime", "prost", "rig-core", @@ -1064,7 +1066,9 @@ dependencies = [ [[package]] name = "macp-proto" -version = "0.1.0" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca7b5c6a2f0de7adb7c6159b00b11f8385eb216b0e68c74f2ef7a95fbd980ea" [[package]] name = "macp-runtime" diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 9585322..240459e 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -23,3 +23,7 @@ thiserror = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } async-trait = "0.1" + +[dev-dependencies] +jsonwebtoken = "9" +base64 = "0.22" diff --git a/integration_tests/src/server_manager.rs b/integration_tests/src/server_manager.rs index 54f4a65..6a0ac3f 100644 --- a/integration_tests/src/server_manager.rs +++ b/integration_tests/src/server_manager.rs @@ -15,17 +15,30 @@ pub struct ServerManager { impl ServerManager { /// Start a runtime server on a free port. Returns once the server is accepting connections. pub async fn start(binary_path: &str) -> Result { + Self::start_with_env(binary_path, &[]).await + } + + /// Start with additional process env vars (e.g. to enable JWT auth). + pub async fn start_with_env(binary_path: &str, extra_env: &[(&str, &str)]) -> Result { let port = find_free_port()?; let bind_addr = format!("127.0.0.1:{port}"); let endpoint = format!("http://{bind_addr}"); tracing::info!("Starting MACP runtime: {binary_path} on {bind_addr}"); - let child = Command::new(binary_path) - .env("MACP_ALLOW_INSECURE", "1") + let mut cmd = Command::new(binary_path); + cmd.env("MACP_ALLOW_INSECURE", "1") .env("MACP_MEMORY_ONLY", "1") - .env("MACP_BIND_ADDR", &bind_addr) - .env("RUST_LOG", "warn") + .env("MACP_BIND_ADDR", &bind_addr); + // Allow callers to override RUST_LOG via extra_env; default to warn. + let has_rust_log = extra_env.iter().any(|(k, _)| *k == "RUST_LOG"); + if !has_rust_log { + cmd.env("RUST_LOG", "warn"); + } + for (k, v) in extra_env { + cmd.env(k, v); + } + let child = cmd .spawn() .with_context(|| format!("failed to start runtime binary: {binary_path}"))?; diff --git a/integration_tests/tests/tier1_jwt.rs b/integration_tests/tests/tier1_jwt.rs new file mode 100644 index 0000000..3fd77f9 --- /dev/null +++ b/integration_tests/tests/tier1_jwt.rs @@ -0,0 +1,364 @@ +//! Tier 1 — JWT auth integration tests through the real gRPC boundary. +//! +//! Spins up a dedicated runtime subprocess with `MACP_AUTH_ISSUER` + +//! `MACP_AUTH_JWKS_JSON` set (inline symmetric HS256 JWKS) and verifies: +//! * Valid signed JWT → accepted, sender derived from `sub` claim +//! * `macp_scopes` claim is honored (observer flag grants read-any-session) +//! * Expired JWT → UNAUTHENTICATED +//! * Wrong issuer → UNAUTHENTICATED +//! * Wrong signature → UNAUTHENTICATED +//! * No bearer header → UNAUTHENTICATED +//! * Opaque (non-JWT) bearer → UNAUTHENTICATED (JWT is the only resolver) + +use base64::Engine; +use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; +use macp_integration_tests::server_manager::ServerManager; +use macp_runtime::pb::macp_runtime_service_client::MacpRuntimeServiceClient; +use macp_runtime::pb::{ + Envelope, GetSessionRequest, SendRequest, SessionStartPayload, +}; +use prost::Message; +use serde::Serialize; +use std::sync::OnceLock; +use tokio::sync::Mutex; +use tonic::transport::Channel; +use tonic::Request; + +const ISSUER: &str = "https://issuer.test"; +const AUDIENCE: &str = "macp-runtime"; +const SECRET: &[u8] = b"integration-test-hmac-secret-32b"; + +const MODE_DECISION: &str = "macp.mode.decision.v1"; +const MODE_VERSION: &str = "1.0.0"; +const CONFIG_VERSION: &str = "config.default"; + +static JWT_SERVER: OnceLock>> = OnceLock::new(); +static JWT_ENDPOINT: OnceLock = OnceLock::new(); + +fn jwks_json() -> String { + let k = base64::engine::general_purpose::STANDARD.encode(SECRET); + serde_json::json!({ "keys": [ { "kty": "oct", "alg": "HS256", "k": k } ] }) + .to_string() +} + +async fn endpoint() -> &'static str { + if let Some(ep) = JWT_ENDPOINT.get() { + return ep.as_str(); + } + let binary = std::env::var("MACP_TEST_BINARY").unwrap_or_else(|_| { + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + format!("{manifest_dir}/../target/debug/macp-runtime") + }); + let jwks = jwks_json(); + let manager = ServerManager::start_with_env( + &binary, + &[ + ("MACP_AUTH_ISSUER", ISSUER), + ("MACP_AUTH_AUDIENCE", AUDIENCE), + ("MACP_AUTH_JWKS_JSON", jwks.as_str()), + ], + ) + .await + .expect("failed to start JWT-configured MACP runtime"); + let ep = manager.endpoint.clone(); + let _ = JWT_ENDPOINT.set(ep); + let _ = JWT_SERVER.set(Mutex::new(Some(manager))); + JWT_ENDPOINT.get().unwrap().as_str() +} + +async fn client() -> MacpRuntimeServiceClient { + let ep = endpoint().await; + MacpRuntimeServiceClient::connect(ep.to_string()) + .await + .expect("failed to connect") +} + +#[derive(Serialize)] +struct Claims<'a> { + sub: &'a str, + iss: &'a str, + aud: &'a str, + exp: i64, + #[serde(skip_serializing_if = "Option::is_none")] + macp_scopes: Option, +} + +fn sign_with_secret(secret: &[u8], claims: &Claims) -> String { + encode( + &Header::new(Algorithm::HS256), + claims, + &EncodingKey::from_secret(secret), + ) + .unwrap() +} + +fn sign(claims: &Claims) -> String { + sign_with_secret(SECRET, claims) +} + +fn with_bearer(token: &str, inner: T) -> Request { + let mut req = Request::new(inner); + req.metadata_mut().insert( + "authorization", + format!("Bearer {token}").parse().unwrap(), + ); + req +} + +fn new_session_id() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + +fn new_message_id() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + +fn session_start_envelope(sid: &str, participants: &[&str]) -> Envelope { + let payload = SessionStartPayload { + intent: "JWT auth test".into(), + participants: participants.iter().map(|s| s.to_string()).collect(), + mode_version: MODE_VERSION.into(), + configuration_version: CONFIG_VERSION.into(), + policy_version: String::new(), + ttl_ms: 60_000, + context_id: String::new(), + extensions: std::collections::HashMap::new(), + roots: vec![], + } + .encode_to_vec(); + Envelope { + macp_version: "1.0".into(), + mode: MODE_DECISION.into(), + message_type: "SessionStart".into(), + message_id: new_message_id(), + session_id: sid.into(), + sender: String::new(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload, + } +} + +fn now_plus(secs: i64) -> i64 { + chrono::Utc::now().timestamp() + secs +} + +// ── Success paths ──────────────────────────────────────────────── + +#[tokio::test] +async fn valid_jwt_authenticates_and_derives_sender_from_sub() { + let mut c = client().await; + let token = sign(&Claims { + sub: "agent://jwt-alice", + iss: ISSUER, + aud: AUDIENCE, + exp: now_plus(300), + macp_scopes: None, + }); + + let sid = new_session_id(); + let env = session_start_envelope(&sid, &["agent://jwt-alice", "agent://jwt-bob"]); + let resp = c + .send(with_bearer( + &token, + SendRequest { + envelope: Some(env), + }, + )) + .await + .expect("send succeeded"); + let ack = resp.into_inner().ack.unwrap(); + assert!(ack.ok, "ack error: {:?}", ack.error); + + // Verify the runtime used `sub` as the authenticated sender. + let get = c + .get_session(with_bearer( + &token, + GetSessionRequest { + session_id: sid.clone(), + }, + )) + .await + .unwrap(); + let meta = get.into_inner().metadata.unwrap(); + assert_eq!(meta.initiator, "agent://jwt-alice"); + assert_eq!(meta.session_id, sid); +} + +#[tokio::test] +async fn observer_scope_can_get_session_for_non_participant() { + let mut c = client().await; + + // Owner starts a session excluding the observer. + let owner_token = sign(&Claims { + sub: "agent://jwt-owner", + iss: ISSUER, + aud: AUDIENCE, + exp: now_plus(300), + macp_scopes: None, + }); + let sid = new_session_id(); + let env = session_start_envelope(&sid, &["agent://jwt-owner", "agent://jwt-peer"]); + let resp = c + .send(with_bearer( + &owner_token, + SendRequest { + envelope: Some(env), + }, + )) + .await + .unwrap(); + assert!(resp.into_inner().ack.unwrap().ok); + + // Observer (not a participant) should still be able to read via is_observer scope. + let observer_token = sign(&Claims { + sub: "agent://jwt-observer", + iss: ISSUER, + aud: AUDIENCE, + exp: now_plus(300), + macp_scopes: Some(serde_json::json!({ "is_observer": true })), + }); + let get = c + .get_session(with_bearer( + &observer_token, + GetSessionRequest { + session_id: sid.clone(), + }, + )) + .await + .expect("observer can read any session"); + assert_eq!(get.into_inner().metadata.unwrap().session_id, sid); + + // A non-observer non-participant should be denied. + let outsider_token = sign(&Claims { + sub: "agent://jwt-outsider", + iss: ISSUER, + aud: AUDIENCE, + exp: now_plus(300), + macp_scopes: None, + }); + let err = c + .get_session(with_bearer( + &outsider_token, + GetSessionRequest { session_id: sid }, + )) + .await + .unwrap_err(); + assert_eq!(err.code(), tonic::Code::PermissionDenied); +} + +// ── Rejection paths ────────────────────────────────────────────── + +fn assert_unauthenticated_ack(ack: macp_runtime::pb::Ack) { + assert!(!ack.ok, "expected auth failure, got ok ack"); + let err = ack.error.expect("auth failure must carry an error"); + assert_eq!( + err.code, "UNAUTHENTICATED", + "expected UNAUTHENTICATED, got {err:?}" + ); +} + +#[tokio::test] +async fn expired_jwt_is_unauthenticated() { + let mut c = client().await; + let token = sign(&Claims { + sub: "agent://jwt-alice", + iss: ISSUER, + aud: AUDIENCE, + exp: now_plus(-600), + macp_scopes: None, + }); + let sid = new_session_id(); + let env = session_start_envelope(&sid, &["agent://jwt-alice"]); + let resp = c + .send(with_bearer( + &token, + SendRequest { + envelope: Some(env), + }, + )) + .await + .expect("RPC returned ack"); + assert_unauthenticated_ack(resp.into_inner().ack.unwrap()); +} + +#[tokio::test] +async fn wrong_issuer_is_unauthenticated() { + let mut c = client().await; + let token = sign(&Claims { + sub: "agent://jwt-alice", + iss: "https://attacker.example", + aud: AUDIENCE, + exp: now_plus(300), + macp_scopes: None, + }); + let sid = new_session_id(); + let env = session_start_envelope(&sid, &["agent://jwt-alice"]); + let resp = c + .send(with_bearer( + &token, + SendRequest { + envelope: Some(env), + }, + )) + .await + .expect("RPC returned ack"); + assert_unauthenticated_ack(resp.into_inner().ack.unwrap()); +} + +#[tokio::test] +async fn forged_signature_is_unauthenticated() { + let mut c = client().await; + let forged = sign_with_secret( + b"not-the-server-secret-0123456789", + &Claims { + sub: "agent://jwt-alice", + iss: ISSUER, + aud: AUDIENCE, + exp: now_plus(300), + macp_scopes: None, + }, + ); + let sid = new_session_id(); + let env = session_start_envelope(&sid, &["agent://jwt-alice"]); + let resp = c + .send(with_bearer( + &forged, + SendRequest { + envelope: Some(env), + }, + )) + .await + .expect("RPC returned ack"); + assert_unauthenticated_ack(resp.into_inner().ack.unwrap()); +} + +#[tokio::test] +async fn missing_authorization_is_unauthenticated() { + let mut c = client().await; + let sid = new_session_id(); + let env = session_start_envelope(&sid, &["agent://jwt-alice"]); + let resp = c + .send(Request::new(SendRequest { + envelope: Some(env), + })) + .await + .expect("RPC returned ack"); + assert_unauthenticated_ack(resp.into_inner().ack.unwrap()); +} + +#[tokio::test] +async fn opaque_bearer_is_unauthenticated_when_only_jwt_is_configured() { + let mut c = client().await; + let sid = new_session_id(); + let env = session_start_envelope(&sid, &["agent://jwt-alice"]); + let resp = c + .send(with_bearer( + "static-opaque-token-with-no-dots", + SendRequest { + envelope: Some(env), + }, + )) + .await + .expect("RPC returned ack"); + assert_unauthenticated_ack(resp.into_inner().ack.unwrap()); +} diff --git a/src/auth/resolvers/jwt_bearer.rs b/src/auth/resolvers/jwt_bearer.rs index c13380d..3579227 100644 --- a/src/auth/resolvers/jwt_bearer.rs +++ b/src/auth/resolvers/jwt_bearer.rs @@ -1,5 +1,5 @@ use crate::auth::resolver::{AuthError, AuthResolver, ResolvedIdentity}; -use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation}; +use jsonwebtoken::{decode, decode_header, Algorithm, DecodingKey, Validation}; use serde::Deserialize; use std::sync::Arc; use tokio::sync::RwLock; @@ -197,10 +197,23 @@ impl AuthResolver for JwtBearerResolver { let keys = self.get_keys().await?; - let mut validation = Validation::new(Algorithm::RS256); + // Inspect the token header to pick a single algorithm to validate against. + // jsonwebtoken 9 requires every algorithm in validation.algorithms to match + // the DecodingKey's family, so a mixed list (RS256 + HS256) with one key + // would always fail with InvalidAlgorithm. We still gate on the configured + // allowlist — if the token's alg isn't configured, we reject it. + let header = decode_header(&token) + .map_err(|e| AuthError::InvalidCredential(format!("malformed JWT header: {e}")))?; + if !self.config.algorithms.contains(&header.alg) { + return Err(AuthError::InvalidCredential(format!( + "JWT algorithm {:?} is not in the configured allowlist", + header.alg + ))); + } + let mut validation = Validation::new(header.alg); validation.set_issuer(&[&self.config.issuer]); validation.set_audience(&[&self.config.audience]); - validation.algorithms = self.config.algorithms.clone(); + validation.algorithms = vec![header.alg]; let mut last_err = None; for key in &keys { @@ -248,3 +261,219 @@ impl AuthResolver for JwtBearerResolver { } } } + +#[cfg(test)] +mod tests { + use super::*; + use base64::Engine; + use jsonwebtoken::{encode, EncodingKey, Header}; + use serde::Serialize; + + const ISSUER: &str = "https://issuer.test"; + const AUDIENCE: &str = "macp-runtime"; + const SECRET: &[u8] = b"super-secret-symmetric-key-32-by"; + + #[derive(Serialize)] + struct TestClaims<'a> { + sub: &'a str, + iss: &'a str, + aud: &'a str, + exp: i64, + #[serde(skip_serializing_if = "Option::is_none")] + macp_scopes: Option, + } + + fn jwks_inline() -> String { + let k = base64::engine::general_purpose::STANDARD.encode(SECRET); + serde_json::json!({ + "keys": [ + { "kty": "oct", "alg": "HS256", "k": k } + ] + }) + .to_string() + } + + fn config() -> JwtConfig { + JwtConfig { + issuer: ISSUER.to_string(), + audience: AUDIENCE.to_string(), + algorithms: vec![Algorithm::HS256], + } + } + + fn sign(claims: &TestClaims) -> String { + let mut header = Header::new(Algorithm::HS256); + header.kid = Some("test-key".into()); + encode(&header, claims, &EncodingKey::from_secret(SECRET)).unwrap() + } + + fn bearer(token: &str) -> MetadataMap { + let mut m = MetadataMap::new(); + m.insert("authorization", format!("Bearer {token}").parse().unwrap()); + m + } + + #[tokio::test] + async fn valid_jwt_resolves_to_identity_with_scopes() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + let token = sign(&TestClaims { + sub: "agent://alice", + iss: ISSUER, + aud: AUDIENCE, + exp: (chrono::Utc::now().timestamp() + 300), + macp_scopes: Some(serde_json::json!({ + "allowed_modes": ["macp.mode.decision.v1"], + "can_start_sessions": true, + "max_open_sessions": 5, + "can_manage_mode_registry": false, + "is_observer": false, + })), + }); + + let id = resolver + .resolve(&bearer(&token)) + .await + .expect("ok") + .expect("some"); + assert_eq!(id.sender, "agent://alice"); + assert_eq!(id.resolver, "jwt_bearer"); + assert!(id.can_start_sessions); + assert_eq!(id.max_open_sessions, Some(5)); + assert!(!id.can_manage_mode_registry); + assert!(!id.is_observer); + let modes = id.allowed_modes.unwrap(); + assert!(modes.contains("macp.mode.decision.v1")); + } + + #[tokio::test] + async fn jwt_without_scopes_defaults_to_permissive_sender() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + let token = sign(&TestClaims { + sub: "agent://bob", + iss: ISSUER, + aud: AUDIENCE, + exp: (chrono::Utc::now().timestamp() + 300), + macp_scopes: None, + }); + let id = resolver.resolve(&bearer(&token)).await.unwrap().unwrap(); + assert_eq!(id.sender, "agent://bob"); + assert!(id.can_start_sessions); // default when unspecified + assert!(id.allowed_modes.is_none()); + assert!(!id.is_observer); + } + + #[tokio::test] + async fn expired_jwt_returns_expired_error() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + // Exceed the default 60s leeway applied by jsonwebtoken's Validation. + let token = sign(&TestClaims { + sub: "agent://alice", + iss: ISSUER, + aud: AUDIENCE, + exp: (chrono::Utc::now().timestamp() - 600), + macp_scopes: None, + }); + let err = resolver.resolve(&bearer(&token)).await.unwrap_err(); + assert!(matches!(err, AuthError::Expired), "got {err:?}"); + } + + #[tokio::test] + async fn wrong_issuer_rejected() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + let token = sign(&TestClaims { + sub: "agent://alice", + iss: "https://other.example", + aud: AUDIENCE, + exp: (chrono::Utc::now().timestamp() + 300), + macp_scopes: None, + }); + let err = resolver.resolve(&bearer(&token)).await.unwrap_err(); + assert!( + matches!(err, AuthError::InvalidCredential(ref m) if m.contains("issuer")), + "got {err:?}" + ); + } + + #[tokio::test] + async fn wrong_audience_rejected() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + let token = sign(&TestClaims { + sub: "agent://alice", + iss: ISSUER, + aud: "other-audience", + exp: (chrono::Utc::now().timestamp() + 300), + macp_scopes: None, + }); + let err = resolver.resolve(&bearer(&token)).await.unwrap_err(); + assert!( + matches!(err, AuthError::InvalidCredential(ref m) if m.contains("audience")), + "got {err:?}" + ); + } + + #[tokio::test] + async fn bad_signature_rejected() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + // Sign with a different key — signature won't verify. + let claims = TestClaims { + sub: "agent://alice", + iss: ISSUER, + aud: AUDIENCE, + exp: (chrono::Utc::now().timestamp() + 300), + macp_scopes: None, + }; + let bad_token = encode( + &Header::new(Algorithm::HS256), + &claims, + &EncodingKey::from_secret(b"different-key-bytes-0123456789!!"), + ) + .unwrap(); + let err = resolver.resolve(&bearer(&bad_token)).await.unwrap_err(); + assert!( + matches!(err, AuthError::InvalidCredential(_)), + "got {err:?}" + ); + } + + #[tokio::test] + async fn opaque_bearer_token_is_not_claimed() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + // No dots → not JWT-shaped → defer to next resolver. + let outcome = resolver + .resolve(&bearer("static-opaque-token")) + .await + .unwrap(); + assert!(outcome.is_none()); + } + + #[tokio::test] + async fn missing_authorization_header_is_not_claimed() { + let resolver = JwtBearerResolver::from_inline_json(config(), &jwks_inline()).unwrap(); + let outcome = resolver.resolve(&MetadataMap::new()).await.unwrap(); + assert!(outcome.is_none()); + } + + #[tokio::test] + async fn server_env_algorithms_accept_hs256_tokens() { + // Reproduce the server's SecurityLayer::from_env() config: algorithms = RS256/ES256/HS256. + let cfg = JwtConfig { + issuer: ISSUER.to_string(), + audience: AUDIENCE.to_string(), + algorithms: vec![Algorithm::RS256, Algorithm::ES256, Algorithm::HS256], + }; + let resolver = JwtBearerResolver::from_inline_json(cfg, &jwks_inline()).unwrap(); + let token = sign(&TestClaims { + sub: "agent://alice", + iss: ISSUER, + aud: AUDIENCE, + exp: (chrono::Utc::now().timestamp() + 300), + macp_scopes: None, + }); + let id = resolver + .resolve(&bearer(&token)) + .await + .expect("ok") + .expect("some"); + assert_eq!(id.sender, "agent://alice"); + } +} From 618cbc09c2250da2d8c1b89536885e23129506d5 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Sat, 18 Apr 2026 15:22:37 -0700 Subject: [PATCH 4/4] Fix Rust Sec Warnings --- .cargo/audit.toml | 16 ++++++++++++++++ Cargo.lock | 4 ++-- 2 files changed, 18 insertions(+), 2 deletions(-) create mode 100644 .cargo/audit.toml diff --git a/.cargo/audit.toml b/.cargo/audit.toml new file mode 100644 index 0000000..9b47ed5 --- /dev/null +++ b/.cargo/audit.toml @@ -0,0 +1,16 @@ +# cargo-audit configuration for macp-runtime +# +# Advisories listed here are reviewed and temporarily accepted. +# Revisit each one when the blocking upstream dependency updates. + +[advisories] +ignore = [ + # RUSTSEC-2026-0097 — `rand 0.8.5` is unsound when a custom logger uses `rand::rng()`. + # Pulled in transitively via the `otel` feature (opentelemetry 0.22 → opentelemetry_sdk, + # tracing-opentelemetry 0.23, opentelemetry-otlp 0.15, which in turn drag tonic 0.11 + # → tower 0.4 → rand 0.8). This is an *unsoundness warning*, not a runtime CVE, and + # the `otel` feature is not in default-features. Clearing it requires upgrading the + # opentelemetry stack from 0.22 → 0.31 (major API churn across versions). Tracked + # separately; revisit when the otel stack is upgraded. + "RUSTSEC-2026-0097", +] diff --git a/Cargo.lock b/Cargo.lock index f0c14f5..a28279f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1760,9 +1760,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "ring", "rustls-pki-types",