From c7b828d8cbbe706ad3e8786511af0c63dbdd90d9 Mon Sep 17 00:00:00 2001 From: haileyesus2433 Date: Wed, 27 May 2026 04:03:32 -0400 Subject: [PATCH 1/6] feat(jans-cedarling): implement policy store refresh worker and cache validation - Added a new `policy_store_refresh` module to handle background refresh of remote policy stores. - Introduced `CacheValidators` to parse and manage HTTP cache headers for conditional GET requests. - Updated `MetricsCollector` to track policy store refresh attempts and outcomes. - Updated `PolicyStoreConfig` to include a refresh interval setting for URL-based sources. - Implemented logic to handle `304 Not Modified` responses efficiently, ensuring the system remains responsive to policy updates. Signed-off-by: haileyesus2433 --- jans-cedarling/cedarling/src/authz/metrics.rs | 73 ++- .../bootstrap_config/policy_store_config.rs | 39 +- .../cedarling/src/http/cache_headers.rs | 303 ++++++++++ jans-cedarling/cedarling/src/http/mod.rs | 158 +++++- .../cedarling/src/init/policy_store.rs | 65 +++ .../src/init/policy_store_refresh.rs | 522 ++++++++++++++++++ .../cedarling/src/init/service_factory.rs | 8 + 7 files changed, 1165 insertions(+), 3 deletions(-) create mode 100644 jans-cedarling/cedarling/src/http/cache_headers.rs create mode 100644 jans-cedarling/cedarling/src/init/policy_store_refresh.rs diff --git a/jans-cedarling/cedarling/src/authz/metrics.rs b/jans-cedarling/cedarling/src/authz/metrics.rs index 62d10913d7f..123c28f5344 100644 --- a/jans-cedarling/cedarling/src/authz/metrics.rs +++ b/jans-cedarling/cedarling/src/authz/metrics.rs @@ -275,6 +275,17 @@ pub(crate) struct MetricsCollector { /// Swapped wholesale on each snapshot; read lock for record_*, write lock for snapshot interval: RwLock>, + + // These counters are emitted on every snapshot regardless of telemetry + // interval (they reflect long-running worker state, not per-interval + // activity). See `crate::init::policy_store_refresh`. + policy_store_refresh_last_attempt_secs: AtomicI64, + policy_store_refresh_last_success_secs: AtomicI64, + policy_store_refresh_consecutive_failures: AtomicI64, + /// Integer-enum encoding of the last [`crate::init::policy_store_refresh::RefreshOutcome`] + /// `0` means "no attempt yet"; see `RefreshOutcome` for the + /// per-outcome values. + policy_store_refresh_last_outcome: AtomicI64, } impl MetricsCollector { @@ -285,6 +296,10 @@ impl MetricsCollector { init_time: now, policy_count: AtomicI64::new(saturating_usize_to_i64(initial_policy_count)), interval: RwLock::new(Box::new(IntervalState::new(now))), + policy_store_refresh_last_attempt_secs: AtomicI64::new(0), + policy_store_refresh_last_success_secs: AtomicI64::new(0), + policy_store_refresh_consecutive_failures: AtomicI64::new(0), + policy_store_refresh_last_outcome: AtomicI64::new(0), } } @@ -294,6 +309,40 @@ impl MetricsCollector { init_time: Utc::now(), policy_count: AtomicI64::new(0), interval: RwLock::new(Box::new(IntervalState::new(Utc::now()))), + policy_store_refresh_last_attempt_secs: AtomicI64::new(0), + policy_store_refresh_last_success_secs: AtomicI64::new(0), + policy_store_refresh_consecutive_failures: AtomicI64::new(0), + policy_store_refresh_last_outcome: AtomicI64::new(0), + } + } + + /// Records a refresh-worker tick outcome. Always runs regardless of + /// `enabled`, since refresh state should be observable even if telemetry + /// emission to Lock is disabled. + pub(crate) fn record_policy_store_refresh( + &self, + outcome: crate::init::policy_store_refresh::RefreshOutcome, + ) { + let now_secs = Utc::now().timestamp(); + self.policy_store_refresh_last_attempt_secs + .store(now_secs, Ordering::Relaxed); + self.policy_store_refresh_last_outcome + .store(outcome as i64, Ordering::Relaxed); + + match outcome { + crate::init::policy_store_refresh::RefreshOutcome::Success + | crate::init::policy_store_refresh::RefreshOutcome::NotModified => { + self.policy_store_refresh_last_success_secs + .store(now_secs, Ordering::Relaxed); + self.policy_store_refresh_consecutive_failures + .store(0, Ordering::Relaxed); + }, + crate::init::policy_store_refresh::RefreshOutcome::HttpError + | crate::init::policy_store_refresh::RefreshOutcome::NetworkError + | crate::init::policy_store_refresh::RefreshOutcome::ParseError => { + self.policy_store_refresh_consecutive_failures + .fetch_add(1, Ordering::Relaxed); + }, } } @@ -555,7 +604,29 @@ impl MetricsCollector { .expect(ERROR_COUNTERS_LOCK_POISONED) .clone(); - let ops = old.to_operational_stats(now, self.init_time, &self.policy_count); + let mut ops = old.to_operational_stats(now, self.init_time, &self.policy_count); + + // Inject policy-store refresh state into the snapshot. + ops.insert( + "policy_store_refresh.last_attempt_secs".to_string(), + self.policy_store_refresh_last_attempt_secs + .load(Ordering::Relaxed), + ); + ops.insert( + "policy_store_refresh.last_success_secs".to_string(), + self.policy_store_refresh_last_success_secs + .load(Ordering::Relaxed), + ); + ops.insert( + "policy_store_refresh.consecutive_failures".to_string(), + self.policy_store_refresh_consecutive_failures + .load(Ordering::Relaxed), + ); + ops.insert( + "policy_store_refresh.last_outcome".to_string(), + self.policy_store_refresh_last_outcome + .load(Ordering::Relaxed), + ); MetricsSnapshot { policy_stats, diff --git a/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs b/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs index c527c257a3f..f8f668e646c 100644 --- a/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs +++ b/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs @@ -15,6 +15,40 @@ use crate::bootstrap_config::BootstrapConfigLoadingError; pub struct PolicyStoreConfig { /// Specifies the source from which the policy will be read. pub source: PolicyStoreSource, + + /// Base refresh interval in seconds for URL-based policy store sources + /// (`CjarUrl`, `LockServer`). `0` disables background refresh and preserves + /// the load-once-at-startup behavior. Ignored for local sources. A server + /// `Cache-Control: max-age` / `Expires` hint may *shorten* the next + /// interval but never lengthens it. + #[serde(default)] + pub refresh_interval_secs: u64, +} + +impl PolicyStoreConfig { + /// Minimum refresh interval, in seconds — anything smaller is clamped up to + /// this value to avoid a busy-poll against the upstream. + pub const MIN_REFRESH_INTERVAL_SECS: u64 = 5; + + /// True if the source is a remote URL and refresh is enabled. + pub fn refresh_enabled(&self) -> bool { + self.refresh_interval_secs > 0 + && matches!( + self.source, + PolicyStoreSource::CjarUrl(_) | PolicyStoreSource::LockServer(_) + ) + } +} + +impl Default for PolicyStoreConfig { + fn default() -> Self { + Self { + source: PolicyStoreSource::Yaml( + "cedar_version: v4.0.0\npolicy_stores: {}\n".to_string(), + ), + refresh_interval_secs: 0, + } + } } /// Raw policy store config @@ -129,6 +163,9 @@ impl TryFrom for PolicyStoreConfig { ), _ => PolicyStoreSource::FileYaml("policy-store.yaml".into()), }; - Ok(Self { source }) + Ok(Self { + source, + ..Default::default() + }) } } diff --git a/jans-cedarling/cedarling/src/http/cache_headers.rs b/jans-cedarling/cedarling/src/http/cache_headers.rs new file mode 100644 index 00000000000..a3b816fd47c --- /dev/null +++ b/jans-cedarling/cedarling/src/http/cache_headers.rs @@ -0,0 +1,303 @@ +// This software is available under the Apache-2.0 license. +// See https://www.apache.org/licenses/LICENSE-2.0.txt for full text. +// +// Copyright (c) 2024, Gluu, Inc. + +//! Parses HTTP cache validation headers used by the policy-store refresh worker. +//! +//! Recognized headers (RFC 7234 / RFC 9111): +//! - `Cache-Control` — `max-age=N`, `no-cache`, `no-store` +//! - `Expires` — RFC 7231 IMF-fixdate +//! - `ETag` — opaque validator quoted-string +//! - `Last-Modified` — RFC 7231 IMF-fixdate (stored verbatim for echoing back) +//! +//! All malformed values are treated as absent. The worker never panics on a +//! surprising header value — it falls back to its configured interval. + +use chrono::{DateTime, Utc}; +use reqwest::header::HeaderMap; +use std::time::Duration; + +/// Cache state extracted from a single HTTP response. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub(crate) struct CacheValidators { + /// Strong or weak ETag, including the quote marks and any `W/` prefix. + pub etag: Option, + /// `Last-Modified` value as the server sent it. Stored verbatim for echoing + /// in a subsequent `If-Modified-Since` request. + pub last_modified: Option, + /// How long the response is fresh according to the server, after the + /// `Date` of the response. Derived from `Cache-Control: max-age` (preferred) + /// or `Expires` minus the current time. + pub fresh_for: Option, + /// `Cache-Control: no-cache` was present — caller should always revalidate. + pub no_cache: bool, + /// `Cache-Control: no-store` was present — caller should not retain the body + /// across restarts (we still keep it in-memory; this flag is informational). + pub no_store: bool, +} + +impl CacheValidators { + /// Parse a [`HeaderMap`] into [`CacheValidators`]. + /// + /// `now` is injected so callers can test deterministically. In production + /// pass [`Utc::now()`]. + pub(crate) fn from_headers(headers: &HeaderMap, now: DateTime) -> Self { + let mut out = Self::default(); + + out.etag = headers + .get(reqwest::header::ETAG) + .and_then(|v| v.to_str().ok()) + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(str::to_owned); + + out.last_modified = headers + .get(reqwest::header::LAST_MODIFIED) + .and_then(|v| v.to_str().ok()) + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(str::to_owned); + + let (cc_max_age, no_cache, no_store) = headers + .get(reqwest::header::CACHE_CONTROL) + .and_then(|v| v.to_str().ok()) + .map(parse_cache_control) + .unwrap_or((None, false, false)); + out.no_cache = no_cache; + out.no_store = no_store; + + // Cache-Control: max-age wins over Expires when both are present. + if let Some(secs) = cc_max_age { + out.fresh_for = Some(Duration::from_secs(secs)); + } else if let Some(expires_at) = headers + .get(reqwest::header::EXPIRES) + .and_then(|v| v.to_str().ok()) + .and_then(parse_http_date) + { + let delta = expires_at.signed_duration_since(now); + // Negative or zero ⇒ already stale, treat as fresh_for=0 so the + // worker will revalidate immediately on its next tick. + out.fresh_for = Some(if delta.num_seconds() > 0 { + Duration::from_secs(delta.num_seconds() as u64) + } else { + Duration::ZERO + }); + } + + // no-cache forces revalidation regardless of any max-age value. + if out.no_cache { + out.fresh_for = Some(Duration::ZERO); + } + + out + } + + /// True if any conditional-request validator (ETag or Last-Modified) is + /// present. Only used by tests today; production code calls + /// [`HttpClient::get_bytes_conditional`] which inspects the fields directly. + #[cfg(test)] + pub(crate) fn has_validator(&self) -> bool { + self.etag.is_some() || self.last_modified.is_some() + } +} + +/// Parse a `Cache-Control` header value into (max_age, no_cache, no_store). +/// Tolerant: malformed directives are skipped. +fn parse_cache_control(value: &str) -> (Option, bool, bool) { + let mut max_age: Option = None; + let mut no_cache = false; + let mut no_store = false; + + for raw in value.split(',') { + let directive = raw.trim(); + if directive.is_empty() { + continue; + } + // Directives are case-insensitive per RFC 9111 + let lower = directive.to_ascii_lowercase(); + if lower == "no-cache" { + no_cache = true; + } else if lower == "no-store" { + no_store = true; + } else if let Some(rest) = lower.strip_prefix("max-age=") { + // Strip optional quotes; ignore unparseable / negative values. + let trimmed = rest.trim().trim_matches('"'); + if let Ok(secs) = trimmed.parse::() { + max_age = Some(secs); + } + } + } + + (max_age, no_cache, no_store) +} + +/// Parse an HTTP date (RFC 7231 IMF-fixdate, RFC 850, or ANSI C `asctime`). +/// Returns `None` on any parse failure. +fn parse_http_date(value: &str) -> Option> { + let trimmed = value.trim(); + // Try in order: RFC 2822 (close to IMF-fixdate), RFC 3339, then a couple of + // explicit format strings for the alternates. + if let Ok(dt) = DateTime::parse_from_rfc2822(trimmed) { + return Some(dt.with_timezone(&Utc)); + } + if let Ok(dt) = DateTime::parse_from_rfc3339(trimmed) { + return Some(dt.with_timezone(&Utc)); + } + // RFC 850, e.g. "Sunday, 06-Nov-94 08:49:37 GMT" + if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%A, %d-%b-%y %H:%M:%S GMT") { + return Some(DateTime::::from_naive_utc_and_offset(naive, Utc)); + } + // ANSI C asctime(), e.g. "Sun Nov 6 08:49:37 1994" + if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(trimmed, "%a %b %e %H:%M:%S %Y") { + return Some(DateTime::::from_naive_utc_and_offset(naive, Utc)); + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; + use std::str::FromStr; + + fn headers(pairs: &[(&str, &str)]) -> HeaderMap { + let mut h = HeaderMap::new(); + for (k, v) in pairs { + h.append( + HeaderName::from_str(k).unwrap(), + HeaderValue::from_str(v).unwrap(), + ); + } + h + } + + fn t0() -> DateTime { + DateTime::parse_from_rfc3339("2026-05-22T12:00:00Z") + .unwrap() + .with_timezone(&Utc) + } + + #[test] + fn empty_headers_produce_empty_validators() { + let v = CacheValidators::from_headers(&HeaderMap::new(), t0()); + assert_eq!(v, CacheValidators::default()); + assert!(!v.has_validator()); + } + + #[test] + fn etag_and_last_modified_are_captured() { + let h = headers(&[ + ("etag", "\"abc123\""), + ("last-modified", "Sun, 06 Nov 1994 08:49:37 GMT"), + ]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.etag.as_deref(), Some("\"abc123\"")); + assert_eq!( + v.last_modified.as_deref(), + Some("Sun, 06 Nov 1994 08:49:37 GMT") + ); + assert!(v.has_validator()); + } + + #[test] + fn weak_etag_preserved_verbatim() { + let h = headers(&[("etag", "W/\"weak-tag\"")]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.etag.as_deref(), Some("W/\"weak-tag\"")); + } + + #[test] + fn cache_control_max_age_overrides_expires() { + let h = headers(&[ + ("cache-control", "max-age=600"), + ("expires", "Sun, 22 May 2027 12:00:00 GMT"), + ]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.fresh_for, Some(Duration::from_secs(600))); + } + + #[test] + fn cache_control_no_cache_zeros_freshness() { + let h = headers(&[("cache-control", "max-age=600, no-cache")]); + let v = CacheValidators::from_headers(&h, t0()); + assert!(v.no_cache); + assert_eq!(v.fresh_for, Some(Duration::ZERO)); + } + + #[test] + fn cache_control_no_store_flag() { + let h = headers(&[("cache-control", "no-store")]); + let v = CacheValidators::from_headers(&h, t0()); + assert!(v.no_store); + } + + #[test] + fn expires_in_future_yields_positive_fresh_for() { + let h = headers(&[("expires", "Fri, 22 May 2026 13:00:00 GMT")]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.fresh_for, Some(Duration::from_secs(3600))); + } + + #[test] + fn expires_in_past_yields_zero_fresh_for() { + let h = headers(&[("expires", "Mon, 01 Jan 1990 00:00:00 GMT")]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.fresh_for, Some(Duration::ZERO)); + } + + #[test] + fn malformed_max_age_is_ignored() { + let h = headers(&[("cache-control", "max-age=banana")]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.fresh_for, None); + } + + #[test] + fn negative_max_age_is_ignored() { + let h = headers(&[("cache-control", "max-age=-30")]); + let v = CacheValidators::from_headers(&h, t0()); + // u64::parse rejects "-30" so this is treated as absent. + assert_eq!(v.fresh_for, None); + } + + #[test] + fn malformed_expires_is_ignored() { + let h = headers(&[("expires", "not a date")]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.fresh_for, None); + } + + #[test] + fn directives_are_case_insensitive() { + let h = headers(&[("cache-control", "MAX-AGE=42, NO-CACHE")]); + let v = CacheValidators::from_headers(&h, t0()); + assert!(v.no_cache); + // no-cache zeroes the freshness even when max-age was set. + assert_eq!(v.fresh_for, Some(Duration::ZERO)); + } + + #[test] + fn empty_directive_segments_skipped() { + let h = headers(&[("cache-control", ",,max-age=15,,")]); + let v = CacheValidators::from_headers(&h, t0()); + assert_eq!(v.fresh_for, Some(Duration::from_secs(15))); + } + + #[test] + fn whitespace_only_etag_is_treated_as_absent() { + let h = headers(&[("etag", " ")]); + let v = CacheValidators::from_headers(&h, t0()); + assert!(v.etag.is_none()); + assert!(!v.has_validator()); + } + + #[test] + fn rfc850_date_format_parsed() { + // chrono parses with a 2-digit year extended to 19xx. + let h = headers(&[("expires", "Sunday, 22-May-94 12:00:00 GMT")]); + let v = CacheValidators::from_headers(&h, t0()); + // In the past, so fresh_for is zero (not None). + assert_eq!(v.fresh_for, Some(Duration::ZERO)); + } +} diff --git a/jans-cedarling/cedarling/src/http/mod.rs b/jans-cedarling/cedarling/src/http/mod.rs index 96303b059a8..71dd3a480cd 100644 --- a/jans-cedarling/cedarling/src/http/mod.rs +++ b/jans-cedarling/cedarling/src/http/mod.rs @@ -3,11 +3,13 @@ // // Copyright (c) 2024, Gluu, Inc. +pub(crate) mod cache_headers; mod spawn_task; pub use spawn_task::*; -use http_utils::{Backoff, HttpRequestError, Sender}; +use cache_headers::CacheValidators; +use http_utils::{Backoff, HttpRequestError, HttpRequestReasonError, Sender}; pub(crate) use reqwest::RequestBuilder; use reqwest::{Client, ClientBuilder}; use serde::Deserialize; @@ -194,6 +196,69 @@ impl HttpClient { let mut sender = self.create_sender(); sender.send_with_retry(|| f(self.raw_client.get(uri))).await } + + /// Fetches `uri` as raw bytes, honoring any previously captured cache + /// validators (`If-None-Match`, `If-Modified-Since`). Used by the policy + /// store refresh worker. + /// + /// Returns: + /// - [`ConditionalFetch::NotModified`] when the server replied `304`. + /// - [`ConditionalFetch::Modified`] with the new body and freshly parsed + /// validators otherwise. + pub(crate) async fn get_bytes_conditional( + &self, + uri: &str, + validators: &CacheValidators, + ) -> Result { + // Clone the small validator strings so the closure can be `Fn` (callable + // on each retry attempt). + let etag = validators.etag.clone(); + let last_modified = validators.last_modified.clone(); + + let response = self + .get_with_retry_with(uri, |b| { + let mut b = b; + if let Some(e) = &etag { + b = b.header(reqwest::header::IF_NONE_MATCH, e.as_str()); + } + if let Some(lm) = &last_modified { + b = b.header(reqwest::header::IF_MODIFIED_SINCE, lm.as_str()); + } + b + }) + .await?; + + let status = response.status(); + if status == reqwest::StatusCode::NOT_MODIFIED { + return Ok(ConditionalFetch::NotModified); + } + + let headers = response.headers().clone(); + let new_validators = CacheValidators::from_headers(&headers, chrono::Utc::now()); + let bytes = response.bytes().await.map(|b| b.to_vec()).map_err(|e| { + HttpRequestError::new( + HttpRequestReasonError::DecodeResponseBytes(e), + Some(status), + ) + })?; + + Ok(ConditionalFetch::Modified { + bytes, + validators: new_validators, + }) + } +} + +/// Outcome of [`HttpClient::get_bytes_conditional`]. +#[derive(Debug)] +pub(crate) enum ConditionalFetch { + /// Server responded `304 Not Modified` — body intentionally omitted. + NotModified, + /// Server returned a fresh body and (possibly) new cache validators. + Modified { + bytes: Vec, + validators: CacheValidators, + }, } #[derive(Debug)] @@ -397,4 +462,95 @@ mod test { "Expected to time out near {request_timeout:?}, took {elapsed:?}", ); } + + #[tokio::test] + async fn conditional_get_returns_not_modified_on_304() { + let mut server = Server::new_async().await; + let mock = server + .mock("GET", "/store") + .match_header("if-none-match", "\"v1\"") + .with_status(304) + .expect(1) + .create_async() + .await; + + let client = HttpClient::new(HTTP_CONF).expect("client"); + let validators = crate::http::cache_headers::CacheValidators { + etag: Some("\"v1\"".to_string()), + ..Default::default() + }; + let url = format!("{}/store", server.url()); + + let result = client + .get_bytes_conditional(&url, &validators) + .await + .expect("request"); + + assert!(matches!(result, super::ConditionalFetch::NotModified)); + mock.assert_async().await; + } + + #[tokio::test] + async fn conditional_get_captures_validators_on_200() { + let mut server = Server::new_async().await; + let mock = server + .mock("GET", "/store") + .with_status(200) + .with_header("etag", "\"v2\"") + .with_header("cache-control", "max-age=120") + .with_body(b"new-body-bytes") + .expect(1) + .create_async() + .await; + + let client = HttpClient::new(HTTP_CONF).expect("client"); + let url = format!("{}/store", server.url()); + + let result = client + .get_bytes_conditional(&url, &Default::default()) + .await + .expect("request"); + + match result { + super::ConditionalFetch::Modified { bytes, validators } => { + assert_eq!(bytes, b"new-body-bytes"); + assert_eq!(validators.etag.as_deref(), Some("\"v2\"")); + assert_eq!( + validators.fresh_for, + Some(std::time::Duration::from_secs(120)) + ); + } + other => panic!("expected Modified, got {other:?}"), + } + mock.assert_async().await; + } + + #[tokio::test] + async fn conditional_get_sends_if_modified_since() { + let mut server = Server::new_async().await; + let mock = server + .mock("GET", "/store") + .match_header( + "if-modified-since", + "Sun, 22 May 2026 12:00:00 GMT", + ) + .with_status(200) + .with_body(b"x") + .expect(1) + .create_async() + .await; + + let client = HttpClient::new(HTTP_CONF).expect("client"); + let validators = crate::http::cache_headers::CacheValidators { + last_modified: Some("Sun, 22 May 2026 12:00:00 GMT".to_string()), + ..Default::default() + }; + let url = format!("{}/store", server.url()); + + let _ = client + .get_bytes_conditional(&url, &validators) + .await + .expect("request"); + mock.assert_async().await; + } } diff --git a/jans-cedarling/cedarling/src/init/policy_store.rs b/jans-cedarling/cedarling/src/init/policy_store.rs index 536a7d55dfa..b21e6e4bbd0 100644 --- a/jans-cedarling/cedarling/src/init/policy_store.rs +++ b/jans-cedarling/cedarling/src/init/policy_store.rs @@ -139,6 +139,68 @@ async fn load_policy_store_from_lock_master( extract_first_policy_store(&agama_policy_store) } +/// Parses already-fetched Lock-Master JSON bytes into a [`PolicyStoreWithID`]. +/// Used by the policy-store refresh worker, which performs the HTTP fetch itself +/// to be able to send conditional-request headers. +pub(crate) fn parse_lock_master_bytes( + bytes: &[u8], +) -> Result { + let agama_policy_store: LegacyAgamaPolicyStore = serde_json::from_slice(bytes)?; + extract_first_policy_store(&agama_policy_store) +} + +/// Parses already-fetched `.cjar` archive bytes into a [`PolicyStoreWithID`]. +/// On native targets the schema-parsing step is offloaded to a blocking thread; +/// on WASM it runs inline since `spawn_blocking` is unavailable. +#[cfg(not(target_arch = "wasm32"))] +pub(crate) async fn parse_cjar_bytes( + bytes: &[u8], +) -> Result { + use crate::common::policy_store::loader; + + let loaded = loader::load_policy_store_archive_bytes(bytes) + .map_err(|e| PolicyStoreLoadError::Archive(format!("Failed to load from archive: {e}")))?; + + let store_id = loaded.metadata.policy_store.id.clone(); + let store_metadata = loaded.metadata.clone(); + + let legacy_store = + tokio::task::spawn_blocking(move || PolicyStoreManager::convert_to_legacy(loaded)) + .await + .map_err(|e| { + PolicyStoreLoadError::Archive(format!("Conversion task panicked: {e}")) + })??; + + Ok(PolicyStoreWithID { + id: store_id, + store: legacy_store, + metadata: Some(store_metadata), + }) +} + +/// Parses already-fetched `.cjar` archive bytes into a [`PolicyStoreWithID`] — +/// WASM build, single-threaded. +#[cfg(target_arch = "wasm32")] +pub(crate) async fn parse_cjar_bytes( + bytes: &[u8], +) -> Result { + use crate::common::policy_store::loader; + + let loaded = loader::load_policy_store_archive_bytes(bytes) + .map_err(|e| PolicyStoreLoadError::Archive(format!("Failed to load from archive: {e}")))?; + + let store_id = loaded.metadata.policy_store.id.clone(); + let store_metadata = loaded.metadata.clone(); + + let legacy_store = PolicyStoreManager::convert_to_legacy(loaded)?; + + Ok(PolicyStoreWithID { + id: store_id, + store: legacy_store, + metadata: Some(store_metadata), + }) +} + /// Loads the policy store from a Cedar Archive (.cjar) file. /// /// Uses the `load_policy_store_archive` function from the loader module @@ -378,6 +440,7 @@ mod test { source: crate::PolicyStoreSource::FileJson( Path::new("../test_files/policy-store_generated.json").into(), ), + ..Default::default() }, &HTTP_CLIENT, ) @@ -392,6 +455,7 @@ mod test { source: crate::PolicyStoreSource::FileYaml( Path::new("../test_files/policy-store_ok.yaml").into(), ), + ..Default::default() }, &HTTP_CLIENT, ) @@ -419,6 +483,7 @@ mod test { load_policy_store( &PolicyStoreConfig { source: crate::PolicyStoreSource::LockServer(uri), + ..Default::default() }, &HTTP_CLIENT, ) diff --git a/jans-cedarling/cedarling/src/init/policy_store_refresh.rs b/jans-cedarling/cedarling/src/init/policy_store_refresh.rs new file mode 100644 index 00000000000..7fde6a40ddb --- /dev/null +++ b/jans-cedarling/cedarling/src/init/policy_store_refresh.rs @@ -0,0 +1,522 @@ +// This software is available under the Apache-2.0 license. +// See https://www.apache.org/licenses/LICENSE-2.0.txt for full text. +// +// Copyright (c) 2024, Gluu, Inc. + +//! Background refresh worker for remote policy store sources. +//! +//! On each tick the worker sends a conditional GET to the configured URL, +//! handling `304 Not Modified` cheaply. When the upstream returns a new body +//! and it parses successfully, the worker builds a fresh [`Authz`] (including a +//! freshly built [`JwtService`] and [`EntityBuilder`]) and publishes it via +//! [`ArcSwap`] so in-flight authorizations are unaffected. Any failure leaves +//! the previously loaded [`Authz`] in place. +//! +//! The worker observes a shutdown signal from a [`futures::channel::oneshot`] +//! receiver — dropping the [`PolicyStoreRefreshHandle`] closes the channel and +//! the worker exits at its next select boundary. + +use arc_swap::ArcSwap; +use chrono::Utc; +use futures::channel::oneshot; +use futures::future::{Either, select}; +use std::collections::hash_map::DefaultHasher; +use std::hash::Hasher; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +use crate::PolicyStoreConfig; +use crate::PolicyStoreSource; +use crate::async_sleep::sleep; +use crate::authz::metrics::MetricsCollector; +use crate::authz::{Authz, AuthzConfig}; +use crate::bootstrap_config::{AuthorizationConfig, JwtConfig}; +use crate::common::policy_store::PolicyStoreWithID; +use crate::context_data_api::DataStore; +use crate::entity_builder::{EntityBuilder, TrustedIssuerIndex}; +use crate::http::cache_headers::CacheValidators; +use crate::http::{ConditionalFetch, HttpClient}; +use crate::http::{JoinHandle, spawn_task}; +use crate::jwt::JwtService; +use crate::log::interface::LogWriter; +use crate::log::{BaseLogEntry, LogEntry, LogLevel, Logger}; + +use super::policy_store::{PolicyStoreLoadError, parse_cjar_bytes, parse_lock_master_bytes}; + +/// Upper bound on exponential backoff between failed refresh attempts. +const REFRESH_FAILURE_BACKOFF_MAX_SECS: u64 = 600; + +/// Outcome of a single refresh tick — encoded as `i64` for emission via the +/// integer-enum `policy_store_refresh.last_outcome` metric. +/// +/// Note: `0` is intentionally **not** assigned to any outcome so that the +/// metrics snapshot's default value (also `0`) means "no refresh attempt +/// observed yet" rather than aliasing with `Success`. +#[repr(i64)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum RefreshOutcome { + Success = 1, + NotModified = 2, + HttpError = 3, + NetworkError = 4, + ParseError = 5, +} + +/// Captures everything the refresh worker needs to rebuild a complete [`Authz`] +/// from a freshly loaded [`PolicyStoreWithID`]. +pub(crate) struct AuthzRebuilder { + pub(crate) jwt_config: JwtConfig, + pub(crate) authorization_config: AuthorizationConfig, + pub(crate) http_client: HttpClient, + pub(crate) log: Logger, + pub(crate) data_store: Arc, + pub(crate) metrics: Arc, +} + +impl AuthzRebuilder { + /// Build a brand-new [`Authz`] from a fresh policy store. Rebuilds the JWT + /// service and entity builder so trusted-issuer and schema changes take + /// effect + pub(crate) async fn rebuild( + &self, + policy_store: PolicyStoreWithID, + ) -> Result { + policy_store + .validate_trusted_issuers() + .map_err(|e| RebuildError::TrustedIssuers(e.to_string()))?; + + let trusted_issuers = policy_store.trusted_issuers.clone(); + let jwt_service = JwtService::new( + &self.jwt_config, + trusted_issuers.clone(), + Some(self.log.clone()), + self.metrics.clone(), + self.http_client.clone(), + ) + .await + .map_err(|e| RebuildError::JwtService(e.to_string()))?; + let jwt_service = Arc::new(jwt_service); + + let issuers_map = trusted_issuers.unwrap_or_default(); + let issuers_index = TrustedIssuerIndex::new(&issuers_map, Some(&self.log)); + let schema = &policy_store.schema.validator_schema; + let default_entities = policy_store.default_entities.entities().to_owned(); + let entity_builder = EntityBuilder::new(issuers_index, Some(schema), default_entities) + .map_err(|e| RebuildError::EntityBuilder(e.to_string()))?; + let entity_builder = Arc::new(entity_builder); + + let config = AuthzConfig { + log_service: self.log.clone(), + policy_store, + jwt_service, + entity_builder, + authorization: self.authorization_config.clone(), + data_store: self.data_store.clone(), + metrics: self.metrics.clone(), + }; + + Ok(Authz::new(config)) + } +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum RebuildError { + #[error("trusted issuers validation failed: {0}")] + TrustedIssuers(String), + #[error("failed to initialize JWT service: {0}")] + JwtService(String), + #[error("failed to initialize entity builder: {0}")] + EntityBuilder(String), +} + +/// Mutable per-source state — what we learned from the previous response. +struct RefreshState { + validators: CacheValidators, + last_body_hash: Option, + consecutive_failures: u32, +} + +impl Default for RefreshState { + fn default() -> Self { + Self { + validators: CacheValidators::default(), + last_body_hash: None, + consecutive_failures: 0, + } + } +} + +impl RefreshState { + /// Returns the delay before the next refresh attempt. Starts from + /// `base_secs`; a server `Cache-Control: max-age` / `Expires` hint may + /// *shorten* the next interval but never extends it. After consecutive + /// failures the delay is exponentially backed off, capped at + /// [`REFRESH_FAILURE_BACKOFF_MAX_SECS`]. Result is floored at + /// [`PolicyStoreConfig::MIN_REFRESH_INTERVAL_SECS`] and gets ±10% jitter. + fn next_delay(&self, base_secs: u64) -> Duration { + let min_secs = PolicyStoreConfig::MIN_REFRESH_INTERVAL_SECS; + let base_secs = base_secs.max(min_secs); + + let server_fresh = self.validators.fresh_for.map(|d| d.as_secs()); + + let mut secs = match server_fresh { + Some(s) if s > 0 => s.min(base_secs), + _ => base_secs, + }; + + if self.consecutive_failures > 0 { + // Cap the doubling at 2^10 = 1024× so we don't overflow u64; the + // overall multiplied value is also clamped to + // REFRESH_FAILURE_BACKOFF_MAX_SECS below. + let shift = self.consecutive_failures.min(10); + let exp = 1u64.checked_shl(shift).unwrap_or(u64::MAX); + secs = secs + .saturating_mul(exp) + .min(REFRESH_FAILURE_BACKOFF_MAX_SECS); + } + + let secs = secs.max(min_secs); + let jitter_pct = jitter_pct(); + let adjusted = (secs as i128) + ((secs as i128) * jitter_pct / 100); + let adjusted = adjusted.max(min_secs as i128) as u64; + Duration::from_secs(adjusted) + } +} + +/// Deterministic but desynchronized jitter in `[-10, +10]`, derived from a +/// process-local counter so we avoid taking a `rand` dependency in WASM builds. +fn jitter_pct() -> i128 { + static COUNTER: AtomicU64 = AtomicU64::new(0); + let n = COUNTER.fetch_add(1, Ordering::Relaxed); + ((n % 21) as i128) - 10 +} + +fn body_hash(bytes: &[u8]) -> u64 { + let mut h = DefaultHasher::new(); + h.write(bytes); + h.finish() +} + +/// Identifies which kind of URL-based source we are refreshing — the parse step +/// differs (JSON for Lock Master, ZIP archive for `.cjar`). +#[derive(Debug, Clone)] +pub(crate) enum RefreshSource { + LockServer { url: String }, + CjarUrl { url: String }, +} + +impl RefreshSource { + pub(crate) fn from_policy_store_source(src: &PolicyStoreSource) -> Option { + match src { + PolicyStoreSource::LockServer(u) => Some(Self::LockServer { url: u.clone() }), + PolicyStoreSource::CjarUrl(u) => Some(Self::CjarUrl { url: u.clone() }), + _ => None, + } + } + + fn url(&self) -> &str { + match self { + Self::LockServer { url } | Self::CjarUrl { url } => url, + } + } + + async fn parse(&self, bytes: &[u8]) -> Result { + match self { + Self::LockServer { .. } => parse_lock_master_bytes(bytes), + Self::CjarUrl { .. } => parse_cjar_bytes(bytes).await, + } + } +} + +/// Handle to a running refresh worker. Dropping the handle signals shutdown. +pub(crate) struct PolicyStoreRefreshHandle { + shutdown: Option>, + // Held only so the worker future is not detached. The worker observes the + // closed shutdown channel and exits on its next select boundary. + _join: JoinHandle<()>, +} + +impl Drop for PolicyStoreRefreshHandle { + fn drop(&mut self) { + drop(self.shutdown.take()); + } +} + +/// Spawn the background refresh worker. Returns a [`PolicyStoreRefreshHandle`] +/// whose `Drop` signals the worker to exit. +#[allow(clippy::too_many_arguments)] +pub(crate) fn spawn_refresh_worker( + source: RefreshSource, + interval_secs: u64, + http_client: HttpClient, + rebuilder: AuthzRebuilder, + authz_swap: Arc>, + metrics: Arc, + log: Logger, +) -> PolicyStoreRefreshHandle { + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + let join = spawn_task(async move { + run_worker( + source, + interval_secs, + http_client, + rebuilder, + authz_swap, + metrics, + log, + shutdown_rx, + ) + .await; + }); + + PolicyStoreRefreshHandle { + shutdown: Some(shutdown_tx), + _join: join, + } +} + +#[allow(clippy::too_many_arguments)] +async fn run_worker( + source: RefreshSource, + interval_secs: u64, + http_client: HttpClient, + rebuilder: AuthzRebuilder, + authz_swap: Arc>, + metrics: Arc, + log: Logger, + shutdown_rx: oneshot::Receiver<()>, +) { + let mut state = RefreshState::default(); + let mut shutdown_rx = shutdown_rx; + + log.log_any( + LogEntry::new(BaseLogEntry::new_system_opt_request_id( + LogLevel::INFO, + None, + )) + .set_message(format!( + "policy store refresh worker started (url={}, interval={}s)", + source.url(), + interval_secs, + )), + ); + + loop { + let delay = state.next_delay(interval_secs); + let sleep_fut = sleep(delay); + futures::pin_mut!(sleep_fut); + + match select(sleep_fut, &mut shutdown_rx).await { + Either::Left((_, _)) => { + let outcome = tick( + &source, + &http_client, + &rebuilder, + &authz_swap, + &mut state, + &log, + ) + .await; + metrics.record_policy_store_refresh(outcome); + }, + Either::Right(_) => { + log.log_any( + LogEntry::new(BaseLogEntry::new_system_opt_request_id( + LogLevel::DEBUG, + None, + )) + .set_message("policy store refresh worker shutting down".to_string()), + ); + break; + }, + } + } +} + +async fn tick( + source: &RefreshSource, + http_client: &HttpClient, + rebuilder: &AuthzRebuilder, + authz_swap: &Arc>, + state: &mut RefreshState, + log: &Logger, +) -> RefreshOutcome { + let url = source.url(); + let start = Utc::now(); + + let fetch_result = http_client + .get_bytes_conditional(url, &state.validators) + .await; + + match fetch_result { + Err(e) => { + state.consecutive_failures = state.consecutive_failures.saturating_add(1); + let is_net = e.is_max_retries_exceeded(); + log.log_any( + LogEntry::new(BaseLogEntry::new_system_opt_request_id( + LogLevel::WARN, + None, + )) + .set_message(format!( + "policy store refresh: {} against {url}: {e}", + if is_net { + "network error" + } else { + "HTTP error" + } + )), + ); + if is_net { + RefreshOutcome::NetworkError + } else { + RefreshOutcome::HttpError + } + }, + Ok(ConditionalFetch::NotModified) => { + state.consecutive_failures = 0; + RefreshOutcome::NotModified + }, + Ok(ConditionalFetch::Modified { bytes, validators }) => { + let new_hash = body_hash(&bytes); + + // Servers sometimes return 200 with byte-identical bodies even when + // we sent valid conditional headers. Short-circuit. + if Some(new_hash) == state.last_body_hash { + state.validators = validators; + state.consecutive_failures = 0; + return RefreshOutcome::NotModified; + } + + let parsed = match source.parse(&bytes).await { + Ok(p) => p, + Err(e) => { + state.consecutive_failures = state.consecutive_failures.saturating_add(1); + log.log_any( + LogEntry::new(BaseLogEntry::new_system_opt_request_id( + LogLevel::ERROR, + None, + )) + .set_message(format!( + "policy store refresh: parse failure for {url}: {e}" + )), + ); + return RefreshOutcome::ParseError; + }, + }; + + let new_authz = match rebuilder.rebuild(parsed).await { + Ok(a) => a, + Err(e) => { + state.consecutive_failures = state.consecutive_failures.saturating_add(1); + log.log_any( + LogEntry::new(BaseLogEntry::new_system_opt_request_id( + LogLevel::ERROR, + None, + )) + .set_message(format!( + "policy store refresh: rebuild failure for {url}: {e}" + )), + ); + return RefreshOutcome::ParseError; + }, + }; + + authz_swap.store(Arc::new(new_authz)); + state.validators = validators; + state.last_body_hash = Some(new_hash); + state.consecutive_failures = 0; + + let elapsed = Utc::now().signed_duration_since(start).num_milliseconds(); + log.log_any( + LogEntry::new(BaseLogEntry::new_system_opt_request_id( + LogLevel::INFO, + None, + )) + .set_message(format!( + "policy store refresh: swapped new store from {url} in {elapsed} ms" + )), + ); + + RefreshOutcome::Success + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn next_delay_uses_base_when_no_server_hint() { + let s = RefreshState::default(); + let d = s.next_delay(300).as_secs(); + // Allow for ±10% jitter. + assert!(d >= 270 && d <= 330, "got {d}"); + } + + #[test] + fn next_delay_honors_shorter_server_hint() { + let mut s = RefreshState::default(); + s.validators.fresh_for = Some(Duration::from_secs(60)); + let d = s.next_delay(300).as_secs(); + // Server says 60s — we pick the shorter of 60 and 300, plus jitter. + assert!(d >= 54 && d <= 66, "got {d}"); + } + + #[test] + fn next_delay_caps_server_hint_at_base() { + let mut s = RefreshState::default(); + // Server says "fresh for an hour" but operator chose 30s. + s.validators.fresh_for = Some(Duration::from_secs(3600)); + let d = s.next_delay(30).as_secs(); + // Should track the operator's 30s base, ±10% jitter — never the server's 3600s. + assert!(d >= 27 && d <= 33, "got {d}"); + } + + #[test] + fn next_delay_clamped_above_min() { + let s = RefreshState::default(); + // base = 1s but min is 5s — should be at least min. + let d = s.next_delay(1).as_secs(); + assert!(d >= PolicyStoreConfig::MIN_REFRESH_INTERVAL_SECS, "got {d}"); + } + + #[test] + fn next_delay_backs_off_on_failures() { + let mut s = RefreshState::default(); + s.consecutive_failures = 3; + let d = s.next_delay(60).as_secs(); + // 60 * 2^3 = 480, plus jitter, capped at REFRESH_FAILURE_BACKOFF_MAX_SECS=600. + // After ±10% jitter, expect between ~432 and ~528. + assert!(d >= 400 && d <= 600, "got {d}"); + } + + #[test] + fn body_hash_stable_for_same_bytes() { + let a = body_hash(b"hello"); + let b = body_hash(b"hello"); + assert_eq!(a, b); + } + + #[test] + fn body_hash_differs_for_different_bytes() { + assert_ne!(body_hash(b"hello"), body_hash(b"world")); + } + + #[test] + fn refresh_source_construction_from_url() { + let s = + RefreshSource::from_policy_store_source(&PolicyStoreSource::CjarUrl("http://x".into())); + assert!(matches!(s, Some(RefreshSource::CjarUrl { .. }))); + + let s = RefreshSource::from_policy_store_source(&PolicyStoreSource::LockServer( + "http://y".into(), + )); + assert!(matches!(s, Some(RefreshSource::LockServer { .. }))); + + let s = RefreshSource::from_policy_store_source(&PolicyStoreSource::Yaml("..".into())); + assert!(s.is_none()); + } +} diff --git a/jans-cedarling/cedarling/src/init/service_factory.rs b/jans-cedarling/cedarling/src/init/service_factory.rs index 8aff7b7e73b..0e6334d0028 100644 --- a/jans-cedarling/cedarling/src/init/service_factory.rs +++ b/jans-cedarling/cedarling/src/init/service_factory.rs @@ -77,6 +77,14 @@ impl<'a> ServiceFactory<'a> { self.service_config.policy_store.metadata.as_ref() } + /// Returns a clone of the HTTP client used during service initialization. + /// Exposed so the policy-store refresh worker can reuse the same client + /// (and therefore the same timeout / retry configuration) for its periodic + /// fetches. + pub(crate) fn http_client_for_refresh(&self) -> crate::http::HttpClient { + self.service_config.http_client.clone() + } + // get log service fn log_service(&mut self) -> log::Logger { self.log_service.clone() From fc202c28731314a0937375f5157fc94d31d66a1c Mon Sep 17 00:00:00 2001 From: haileyesus2433 Date: Wed, 27 May 2026 04:05:03 -0400 Subject: [PATCH 2/6] feat(jans-cedarling): Update policy store refresh functionality - Introduced a new `policy_store_refresh` module to manage background refresh of remote policy stores. - Updated `Cedarling` struct to utilize `ArcSwap` for atomic updates of the `Authz` instance. - Added support for a configurable refresh interval for policy stores in `BootstrapConfig`. - Implemented logic to spawn a refresh worker based on the configured refresh interval. - Enhanced deserialization for policy store refresh interval to ensure proper clamping of values. Signed-off-by: haileyesus2433 --- .../cedarling/src/bootstrap_config/decode.rs | 11 ++- .../cedarling/src/bootstrap_config/mod.rs | 1 + .../src/bootstrap_config/raw_config/config.rs | 17 +++-- .../bootstrap_config/raw_config/json_util.rs | 18 +++++ jans-cedarling/cedarling/src/init/mod.rs | 1 + jans-cedarling/cedarling/src/lib.rs | 70 ++++++++++++++++--- 6 files changed, 102 insertions(+), 16 deletions(-) diff --git a/jans-cedarling/cedarling/src/bootstrap_config/decode.rs b/jans-cedarling/cedarling/src/bootstrap_config/decode.rs index f789fbc97be..99cbae9f9d8 100644 --- a/jans-cedarling/cedarling/src/bootstrap_config/decode.rs +++ b/jans-cedarling/cedarling/src/bootstrap_config/decode.rs @@ -64,6 +64,7 @@ impl BootstrapConfig { // Case: get the policy store from a JSON string (Some(policy_store), None, None) => PolicyStoreConfig { source: PolicyStoreSource::Json(policy_store), + refresh_interval_secs: raw.policy_store_refresh_interval_secs, }, // Case: get the policy store from a URI (auto-detect .cjar archives) (None, Some(policy_store_uri), None) => { @@ -72,7 +73,10 @@ impl BootstrapConfig { } else { PolicyStoreSource::LockServer(policy_store_uri) }; - PolicyStoreConfig { source } + PolicyStoreConfig { + source, + refresh_interval_secs: raw.policy_store_refresh_interval_secs, + } }, // Case: get the policy store from a local file or directory (None, None, Some(raw_path)) => { @@ -96,7 +100,10 @@ impl BootstrapConfig { )?, } }; - PolicyStoreConfig { source } + PolicyStoreConfig { + source, + refresh_interval_secs: raw.policy_store_refresh_interval_secs, + } }, // Case: multiple polict stores were set _ => Err(BootstrapConfigLoadingError::ConflictingPolicyStores)?, diff --git a/jans-cedarling/cedarling/src/bootstrap_config/mod.rs b/jans-cedarling/cedarling/src/bootstrap_config/mod.rs index 4ade9ea0acb..865854ee6e5 100644 --- a/jans-cedarling/cedarling/src/bootstrap_config/mod.rs +++ b/jans-cedarling/cedarling/src/bootstrap_config/mod.rs @@ -83,6 +83,7 @@ impl Default for BootstrapConfig { source: PolicyStoreSource::Yaml( "cedar_version: v4.0.0\npolicy_stores: {}\n".to_string(), ), + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), diff --git a/jans-cedarling/cedarling/src/bootstrap_config/raw_config/config.rs b/jans-cedarling/cedarling/src/bootstrap_config/raw_config/config.rs index d17c1588d4e..21ab888c46e 100644 --- a/jans-cedarling/cedarling/src/bootstrap_config/raw_config/config.rs +++ b/jans-cedarling/cedarling/src/bootstrap_config/raw_config/config.rs @@ -10,8 +10,8 @@ use super::default_values::{ default_enabled_feature_toggle, default_http_client_max_retries, default_http_client_retry_delay_secs, default_jti, default_jwks_refresh_min_interval, default_log_channel_capacity, default_log_max_retries, - default_status_list_refresh_interval_max, default_token_cache_capacity, - default_token_cache_max_ttl, default_true, + default_status_list_refresh_interval_max, + default_token_cache_capacity, default_token_cache_max_ttl, default_true, }; #[cfg(not(target_arch = "wasm32"))] use super::default_values::{ @@ -20,8 +20,8 @@ use super::default_values::{ use super::feature_types::{FeatureToggle, LoggerType}; use super::json_util::{ deserialize_jwks_refresh_interval, deserialize_jwks_refresh_min_interval, - deserialize_or_parse_string_as_json, deserialize_status_list_refresh_interval_max, - parse_option_string, + deserialize_or_parse_string_as_json, deserialize_policy_store_refresh_interval, + deserialize_status_list_refresh_interval_max, parse_option_string, }; use crate::jwt_config::{TrustedIssuerLoaderTypeRaw, WorkersCount}; use crate::log::LogLevel; @@ -422,6 +422,15 @@ pub struct BootstrapConfigRaw { )] #[serde(deserialize_with = "deserialize_status_list_refresh_interval_max")] pub status_list_refresh_interval_max: u64, + + /// Base refresh interval, in seconds, for periodic background refresh of + /// remote policy stores (`CjarUrl` / `LockServer`). `0` disables refresh and + /// preserves the load-once-at-startup behavior. Non-zero values below `5` + /// are clamped to `5`. A server `Cache-Control: max-age` / `Expires` hint + /// can *shorten* the next interval but never extends it. + #[serde(rename = "CEDARLING_POLICY_STORE_REFRESH_INTERVAL", default)] + #[serde(deserialize_with = "deserialize_policy_store_refresh_interval")] + pub policy_store_refresh_interval_secs: u64, } impl Default for BootstrapConfigRaw { diff --git a/jans-cedarling/cedarling/src/bootstrap_config/raw_config/json_util.rs b/jans-cedarling/cedarling/src/bootstrap_config/raw_config/json_util.rs index 1d3e11235d5..10209c6ed95 100644 --- a/jans-cedarling/cedarling/src/bootstrap_config/raw_config/json_util.rs +++ b/jans-cedarling/cedarling/src/bootstrap_config/raw_config/json_util.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Deserializer}; use serde_json::Value; +use crate::bootstrap_config::policy_store_config::PolicyStoreConfig; use crate::jwt_config::{normalize_status_list_refresh_interval_max, MIN_JWKS_REFRESH_SECS}; /// Custom parser for an Option which returns `None` if the string is empty. @@ -88,6 +89,23 @@ where Ok(value.max(MIN_JWKS_REFRESH_SECS)) } +/// Normalize the policy-store refresh interval. `0` means "disabled" and is left +/// alone. Non-zero values below `MIN_REFRESH_INTERVAL_SECS` are clamped up to +/// avoid a busy-poll against the upstream. +pub(super) fn deserialize_policy_store_refresh_interval<'de, D>( + deserializer: D, +) -> Result +where + D: serde::Deserializer<'de>, +{ + let value: u64 = deserialize_or_parse_string_as_json(deserializer)?; + Ok(if value == 0 { + 0 + } else { + value.max(PolicyStoreConfig::MIN_REFRESH_INTERVAL_SECS) + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/jans-cedarling/cedarling/src/init/mod.rs b/jans-cedarling/cedarling/src/init/mod.rs index e1adb14cc4c..09f12622059 100644 --- a/jans-cedarling/cedarling/src/init/mod.rs +++ b/jans-cedarling/cedarling/src/init/mod.rs @@ -10,6 +10,7 @@ //! - get keys for JWT validation pub(crate) mod policy_store; +pub(crate) mod policy_store_refresh; pub(crate) mod service_config; pub(crate) mod service_factory; diff --git a/jans-cedarling/cedarling/src/lib.rs b/jans-cedarling/cedarling/src/lib.rs index bf9e4e02274..41f7edea8a9 100644 --- a/jans-cedarling/cedarling/src/lib.rs +++ b/jans-cedarling/cedarling/src/lib.rs @@ -55,6 +55,9 @@ use common::app_types::{self, ApplicationName}; pub use common::policy_store::{PolicyEffect, PolicyMetadata}; pub use http::HttpClientConfig; use init::ServiceFactory; +use init::policy_store_refresh::{ + spawn_refresh_worker, AuthzRebuilder, PolicyStoreRefreshHandle, RefreshSource, +}; use init::service_config::{ServiceConfig, ServiceConfigError}; use init::service_factory::ServiceInitError; use lock::InitLockServiceError; @@ -108,8 +111,18 @@ pub enum InitCedarlingError { #[derive(Clone)] pub struct Cedarling { log: log::Logger, - authz: Arc, + /// Wrapped in [`ArcSwap`] so the policy-store refresh worker can publish a + /// freshly built [`Authz`] (with new policy store, rebuilt JWT service and + /// entity builder) atomically. Every public method snapshots via + /// [`ArcSwap::load`] so an in-flight authorization keeps using the + /// pre-swap instance. + authz: Arc>, data: Arc, + /// Held purely for its `Drop` side effect: dropping the last `Arc` closes + /// the worker's `oneshot` shutdown channel so the background refresh loop + /// exits when [`Cedarling`] goes away. The leading `_` tells the compiler + /// the field is intentionally not read. + _refresh_handle: Option>, } impl Cedarling { @@ -211,10 +224,45 @@ impl Cedarling { }); } + let authz = service_factory.authz_service().await?; + let authz_swap = Arc::new(arc_swap::ArcSwap::from(authz)); + + // Spawn the policy-store refresh worker if the source is remote and a + // non-zero refresh interval was configured. + let refresh_handle = if config.policy_store_config.refresh_enabled() { + if let Some(source) = + RefreshSource::from_policy_store_source(&config.policy_store_config.source) + { + let rebuilder = AuthzRebuilder { + jwt_config: config.jwt_config.clone(), + authorization_config: config.authorization_config.clone(), + http_client: service_factory.http_client_for_refresh(), + log: log.clone(), + data_store: data.clone(), + metrics: metrics.clone(), + }; + let handle = spawn_refresh_worker( + source, + config.policy_store_config.refresh_interval_secs, + service_factory.http_client_for_refresh(), + rebuilder, + authz_swap.clone(), + metrics.clone(), + log.clone(), + ); + Some(Arc::new(handle)) + } else { + None + } + } else { + None + }; + Ok(Cedarling { log, - authz: service_factory.authz_service().await?, + authz: authz_swap, data, + _refresh_handle: refresh_handle, }) } @@ -230,7 +278,7 @@ impl Cedarling { &self, request: RequestUnsigned, ) -> Result { - self.authz.authorize_unsigned(&request) + self.authz.load().authorize_unsigned(&request) } /// Authorize multi-issuer request. @@ -240,7 +288,7 @@ impl Cedarling { &self, request: AuthorizeMultiIssuerRequest, ) -> Result { - self.authz.authorize_multi_issuer(&request) + self.authz.load().authorize_multi_issuer(&request) } /// Returns metadata for all policies whose scope constraints are compatible @@ -255,6 +303,7 @@ impl Cedarling { resources: &[EntityData], ) -> Result, AuthorizeError> { self.authz + .load() .get_matching_policies_unsigned(principal, actions, resources) } @@ -269,6 +318,7 @@ impl Cedarling { resources: &[EntityData], ) -> Result, AuthorizeError> { self.authz + .load() .get_matching_policies_multi_issuer(tokens, actions, resources) } @@ -280,27 +330,27 @@ impl Cedarling { impl TrustedIssuerLoadingInfo for Cedarling { fn is_trusted_issuer_loaded_by_name(&self, issuer_id: &str) -> bool { - self.authz.is_trusted_issuer_loaded_by_name(issuer_id) + self.authz.load().is_trusted_issuer_loaded_by_name(issuer_id) } fn is_trusted_issuer_loaded_by_iss(&self, iss_claim: &str) -> bool { - self.authz.is_trusted_issuer_loaded_by_iss(iss_claim) + self.authz.load().is_trusted_issuer_loaded_by_iss(iss_claim) } fn total_issuers(&self) -> usize { - self.authz.total_issuers() + self.authz.load().total_issuers() } fn loaded_trusted_issuers_count(&self) -> usize { - self.authz.loaded_trusted_issuers_count() + self.authz.load().loaded_trusted_issuers_count() } fn loaded_trusted_issuer_ids(&self) -> HashSet { - self.authz.loaded_trusted_issuer_ids() + self.authz.load().loaded_trusted_issuer_ids() } fn failed_trusted_issuer_ids(&self) -> HashSet { - self.authz.failed_trusted_issuer_ids() + self.authz.load().failed_trusted_issuer_ids() } } From 0f64fa0c1e58e138ea2dd0092b427874af06eaed Mon Sep 17 00:00:00 2001 From: haileyesus2433 Date: Wed, 27 May 2026 04:05:51 -0400 Subject: [PATCH 3/6] feat(jans-cedarling): set default values for policy store configurations - Updated multiple benchmark files to include default values for `policy_store_config` in `BootstrapConfig`. - Ensured consistency across `authz_authorize_multi_issuer_benchmark.rs`, `authz_authorize_unsigned_benchmark.rs`, `context_data_store_benchmark.rs`, and `startup_benchmark.rs`. Signed-off-by: haileyesus2433 --- .../cedarling/benches/authz_authorize_multi_issuer_benchmark.rs | 1 + .../cedarling/benches/authz_authorize_unsigned_benchmark.rs | 1 + .../cedarling/benches/context_data_store_benchmark.rs | 2 ++ jans-cedarling/cedarling/benches/startup_benchmark.rs | 1 + 4 files changed, 5 insertions(+) diff --git a/jans-cedarling/cedarling/benches/authz_authorize_multi_issuer_benchmark.rs b/jans-cedarling/cedarling/benches/authz_authorize_multi_issuer_benchmark.rs index c23dae1c991..3deec138290 100644 --- a/jans-cedarling/cedarling/benches/authz_authorize_multi_issuer_benchmark.rs +++ b/jans-cedarling/cedarling/benches/authz_authorize_multi_issuer_benchmark.rs @@ -103,6 +103,7 @@ async fn prepare_cedarling_with_jwt_validation( source: cedarling::PolicyStoreSource::Yaml( serde_yaml_ng::to_string(&policy_store).expect("serialize policy store to YAML"), ), + ..Default::default() }, jwt_config: JwtConfig { jwks: None, diff --git a/jans-cedarling/cedarling/benches/authz_authorize_unsigned_benchmark.rs b/jans-cedarling/cedarling/benches/authz_authorize_unsigned_benchmark.rs index 3f9f412ffeb..b0f5b53e9b6 100644 --- a/jans-cedarling/cedarling/benches/authz_authorize_unsigned_benchmark.rs +++ b/jans-cedarling/cedarling/benches/authz_authorize_unsigned_benchmark.rs @@ -148,6 +148,7 @@ async fn prepare_cedarling() -> Result { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE.to_string()), + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), diff --git a/jans-cedarling/cedarling/benches/context_data_store_benchmark.rs b/jans-cedarling/cedarling/benches/context_data_store_benchmark.rs index 428f7c025ff..51e4805f1dd 100644 --- a/jans-cedarling/cedarling/benches/context_data_store_benchmark.rs +++ b/jans-cedarling/cedarling/benches/context_data_store_benchmark.rs @@ -94,6 +94,7 @@ static BSCONFIG: LazyLock = LazyLock::new(|| BootstrapConfig { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE.to_string()), + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), @@ -112,6 +113,7 @@ static BSCONFIG_WITH_DATA_POLICY: LazyLock = LazyLock::new(|| B }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_WITH_DATA.to_string()), + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), diff --git a/jans-cedarling/cedarling/benches/startup_benchmark.rs b/jans-cedarling/cedarling/benches/startup_benchmark.rs index db9cee24536..b82582db714 100644 --- a/jans-cedarling/cedarling/benches/startup_benchmark.rs +++ b/jans-cedarling/cedarling/benches/startup_benchmark.rs @@ -39,6 +39,7 @@ static BSCONFIG_LOCAL: LazyLock = LazyLock::new(|| BootstrapCon }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE.to_string()), + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), From 872181003211a7fe49491be5405264ad8015bfbb Mon Sep 17 00:00:00 2001 From: haileyesus2433 Date: Wed, 27 May 2026 04:06:19 -0400 Subject: [PATCH 4/6] feat(jans-cedarling): set default values for policy store configurations in examples - Added default values for `policy_store_config` in multiple example files including `authorize_unsigned.rs`, `bulk_authorization_benchmark.rs`, `lock_integration.rs`, `log_init.rs`, `profiling_multi_issuer.rs`, and `profiling_unsigned.rs`. - Ensured consistency across examples to streamline configuration setup. Signed-off-by: haileyesus2433 --- jans-cedarling/cedarling/examples/authorize_unsigned.rs | 1 + .../cedarling/examples/bulk_authorization_benchmark.rs | 1 + jans-cedarling/cedarling/examples/lock_integration.rs | 1 + jans-cedarling/cedarling/examples/log_init.rs | 1 + jans-cedarling/cedarling/examples/profiling_multi_issuer.rs | 1 + jans-cedarling/cedarling/examples/profiling_unsigned.rs | 1 + 6 files changed, 6 insertions(+) diff --git a/jans-cedarling/cedarling/examples/authorize_unsigned.rs b/jans-cedarling/cedarling/examples/authorize_unsigned.rs index c9591dedd13..f43e78b1a7d 100644 --- a/jans-cedarling/cedarling/examples/authorize_unsigned.rs +++ b/jans-cedarling/cedarling/examples/authorize_unsigned.rs @@ -23,6 +23,7 @@ async fn main() -> Result<(), Box> { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_RAW.to_string()), + ..Default::default() }, jwt_config: JwtConfig { jwks: None, diff --git a/jans-cedarling/cedarling/examples/bulk_authorization_benchmark.rs b/jans-cedarling/cedarling/examples/bulk_authorization_benchmark.rs index 2e74fcf7679..09ddff566bd 100644 --- a/jans-cedarling/cedarling/examples/bulk_authorization_benchmark.rs +++ b/jans-cedarling/cedarling/examples/bulk_authorization_benchmark.rs @@ -82,6 +82,7 @@ async fn initialize_cedarling() -> Result> }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_RAW.to_string()), + ..Default::default() }, jwt_config: JwtConfig { jwks: None, diff --git a/jans-cedarling/cedarling/examples/lock_integration.rs b/jans-cedarling/cedarling/examples/lock_integration.rs index 01455f34d57..a9f6b7d629f 100644 --- a/jans-cedarling/cedarling/examples/lock_integration.rs +++ b/jans-cedarling/cedarling/examples/lock_integration.rs @@ -48,6 +48,7 @@ async fn main() -> Result<(), Box> { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_RAW.to_string()), + ..Default::default() }, jwt_config: JwtConfig { jwks: None, diff --git a/jans-cedarling/cedarling/examples/log_init.rs b/jans-cedarling/cedarling/examples/log_init.rs index ceea28e8973..84ef1fe2d47 100644 --- a/jans-cedarling/cedarling/examples/log_init.rs +++ b/jans-cedarling/cedarling/examples/log_init.rs @@ -54,6 +54,7 @@ async fn main() -> Result<(), Box> { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_RAW.to_string()), + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), diff --git a/jans-cedarling/cedarling/examples/profiling_multi_issuer.rs b/jans-cedarling/cedarling/examples/profiling_multi_issuer.rs index 8ab1de654dc..07a7e4e008f 100644 --- a/jans-cedarling/cedarling/examples/profiling_multi_issuer.rs +++ b/jans-cedarling/cedarling/examples/profiling_multi_issuer.rs @@ -99,6 +99,7 @@ async fn init_cedarling_multi_issuer( source: cedarling::PolicyStoreSource::Yaml( serde_yaml_ng::to_string(&policy_store).expect("serialize policy store to YAML"), ), + ..Default::default() }, jwt_config: JwtConfig { jwks: None, diff --git a/jans-cedarling/cedarling/examples/profiling_unsigned.rs b/jans-cedarling/cedarling/examples/profiling_unsigned.rs index a16f9070d34..f1b3f2d6669 100644 --- a/jans-cedarling/cedarling/examples/profiling_unsigned.rs +++ b/jans-cedarling/cedarling/examples/profiling_unsigned.rs @@ -150,6 +150,7 @@ async fn init_cedarling() -> Cedarling { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_RAW.to_string()), + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), From 48e8b510934b57ef24a7a75e806b59dafe99714c Mon Sep 17 00:00:00 2001 From: haileyesus2433 Date: Wed, 27 May 2026 04:06:49 -0400 Subject: [PATCH 5/6] feat(jans-cedarling): add default values for policy store configurations in test files - Updated `policy_store_config` in `policy_store_loader.rs`, `ssa_validation_integration.rs`, and `cedarling_util.rs` to include default values. - Ensured consistency across test files to streamline configuration setup. Signed-off-by: haileyesus2433 --- jans-cedarling/cedarling/src/tests/policy_store_loader.rs | 2 ++ .../cedarling/src/tests/ssa_validation_integration.rs | 2 ++ jans-cedarling/cedarling/src/tests/utils/cedarling_util.rs | 1 + 3 files changed, 5 insertions(+) diff --git a/jans-cedarling/cedarling/src/tests/policy_store_loader.rs b/jans-cedarling/cedarling/src/tests/policy_store_loader.rs index 904504462e2..f09df83a80b 100644 --- a/jans-cedarling/cedarling/src/tests/policy_store_loader.rs +++ b/jans-cedarling/cedarling/src/tests/policy_store_loader.rs @@ -231,6 +231,7 @@ fn create_jwt_cedarling_config_with_loader( }, policy_store_config: PolicyStoreConfig { source: policy_store_source, + ..Default::default() }, jwt_config: JwtConfig { jwks: None, @@ -809,6 +810,7 @@ permit( let loaded = crate::init::policy_store::load_policy_store( &crate::PolicyStoreConfig { source: crate::PolicyStoreSource::CjarFile(archive_path), + ..Default::default() }, &http_client, ) diff --git a/jans-cedarling/cedarling/src/tests/ssa_validation_integration.rs b/jans-cedarling/cedarling/src/tests/ssa_validation_integration.rs index ef691cfe249..30aa5d5304c 100644 --- a/jans-cedarling/cedarling/src/tests/ssa_validation_integration.rs +++ b/jans-cedarling/cedarling/src/tests/ssa_validation_integration.rs @@ -46,6 +46,7 @@ async fn test_cedarling_with_valid_ssa() { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_RAW.to_string()), + ..Default::default() }, jwt_config: JwtConfig { jwks: None, @@ -99,6 +100,7 @@ async fn test_cedarling_without_ssa() { }, policy_store_config: PolicyStoreConfig { source: PolicyStoreSource::Yaml(POLICY_STORE_RAW.to_string()), + ..Default::default() }, jwt_config: JwtConfig { jwks: None, diff --git a/jans-cedarling/cedarling/src/tests/utils/cedarling_util.rs b/jans-cedarling/cedarling/src/tests/utils/cedarling_util.rs index 02b75d6db77..1c8fb5854e4 100644 --- a/jans-cedarling/cedarling/src/tests/utils/cedarling_util.rs +++ b/jans-cedarling/cedarling/src/tests/utils/cedarling_util.rs @@ -23,6 +23,7 @@ pub(crate) fn get_config(policy_source: PolicyStoreSource) -> BootstrapConfig { }, policy_store_config: PolicyStoreConfig { source: policy_source, + ..Default::default() }, jwt_config: JwtConfig::new_without_validation(), authorization_config: AuthorizationConfig::default(), From dc73570d731c09ed6f664ffa105b610ea06463aa Mon Sep 17 00:00:00 2001 From: haileyesus2433 Date: Wed, 27 May 2026 04:36:12 -0400 Subject: [PATCH 6/6] refactor(jans-cedarling): update authorization and refresh logic - Updated authorization methods in `blocking.rs` to utilize the `load()` method for improved instance management. - Refactored the refresh worker spawning logic in `lib.rs` into a new `maybe_spawn_refresh_worker` function for better clarity and maintainability. - Improved cache header handling in `cache_headers.rs` by simplifying the `from_headers` method and ensuring proper handling of cache control directives. - Enhanced `RefreshState` struct in `policy_store_refresh.rs` to streamline initialization and improve clarity in managing refresh states. Signed-off-by: haileyesus2433 --- jans-cedarling/cedarling/src/blocking.rs | 6 +- .../bootstrap_config/policy_store_config.rs | 1 + .../cedarling/src/http/cache_headers.rs | 43 ++++++----- jans-cedarling/cedarling/src/http/mod.rs | 5 +- .../src/init/policy_store_refresh.rs | 58 ++++++++------- jans-cedarling/cedarling/src/lib.rs | 73 +++++++++++-------- 6 files changed, 106 insertions(+), 80 deletions(-) diff --git a/jans-cedarling/cedarling/src/blocking.rs b/jans-cedarling/cedarling/src/blocking.rs index 6dcfa0077bd..8cdd2b9e269 100644 --- a/jans-cedarling/cedarling/src/blocking.rs +++ b/jans-cedarling/cedarling/src/blocking.rs @@ -54,7 +54,7 @@ impl Cedarling { &self, request: RequestUnsigned, ) -> Result { - self.instance.authz.authorize_unsigned(&request) + self.instance.authz.load().authorize_unsigned(&request) } /// Authorize multi-issuer request. @@ -64,7 +64,7 @@ impl Cedarling { &self, request: crate::authz::request::AuthorizeMultiIssuerRequest, ) -> Result { - self.instance.authz.authorize_multi_issuer(&request) + self.instance.authz.load().authorize_multi_issuer(&request) } /// Returns metadata for all policies whose scope constraints are compatible @@ -77,6 +77,7 @@ impl Cedarling { ) -> Result, AuthorizeError> { self.instance .authz + .load() .get_matching_policies_unsigned(principal, actions, resources) } @@ -90,6 +91,7 @@ impl Cedarling { ) -> Result, AuthorizeError> { self.instance .authz + .load() .get_matching_policies_multi_issuer(tokens, actions, resources) } diff --git a/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs b/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs index f8f668e646c..84fc8597687 100644 --- a/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs +++ b/jans-cedarling/cedarling/src/bootstrap_config/policy_store_config.rs @@ -31,6 +31,7 @@ impl PolicyStoreConfig { pub const MIN_REFRESH_INTERVAL_SECS: u64 = 5; /// True if the source is a remote URL and refresh is enabled. + #[must_use] pub fn refresh_enabled(&self) -> bool { self.refresh_interval_secs > 0 && matches!( diff --git a/jans-cedarling/cedarling/src/http/cache_headers.rs b/jans-cedarling/cedarling/src/http/cache_headers.rs index a3b816fd47c..e94af4a6dae 100644 --- a/jans-cedarling/cedarling/src/http/cache_headers.rs +++ b/jans-cedarling/cedarling/src/http/cache_headers.rs @@ -21,7 +21,7 @@ use std::time::Duration; /// Cache state extracted from a single HTTP response. #[derive(Debug, Clone, Default, PartialEq, Eq)] pub(crate) struct CacheValidators { - /// Strong or weak ETag, including the quote marks and any `W/` prefix. + /// Strong or weak `ETag`, including the quote marks and any `W/` prefix. pub etag: Option, /// `Last-Modified` value as the server sent it. Stored verbatim for echoing /// in a subsequent `If-Modified-Since` request. @@ -43,16 +43,14 @@ impl CacheValidators { /// `now` is injected so callers can test deterministically. In production /// pass [`Utc::now()`]. pub(crate) fn from_headers(headers: &HeaderMap, now: DateTime) -> Self { - let mut out = Self::default(); - - out.etag = headers + let etag = headers .get(reqwest::header::ETAG) .and_then(|v| v.to_str().ok()) .map(str::trim) .filter(|s| !s.is_empty()) .map(str::to_owned); - out.last_modified = headers + let last_modified = headers .get(reqwest::header::LAST_MODIFIED) .and_then(|v| v.to_str().ok()) .map(str::trim) @@ -62,14 +60,11 @@ impl CacheValidators { let (cc_max_age, no_cache, no_store) = headers .get(reqwest::header::CACHE_CONTROL) .and_then(|v| v.to_str().ok()) - .map(parse_cache_control) - .unwrap_or((None, false, false)); - out.no_cache = no_cache; - out.no_store = no_store; + .map_or((None, false, false), parse_cache_control); // Cache-Control: max-age wins over Expires when both are present. - if let Some(secs) = cc_max_age { - out.fresh_for = Some(Duration::from_secs(secs)); + let mut fresh_for = if let Some(secs) = cc_max_age { + Some(Duration::from_secs(secs)) } else if let Some(expires_at) = headers .get(reqwest::header::EXPIRES) .and_then(|v| v.to_str().ok()) @@ -78,22 +73,30 @@ impl CacheValidators { let delta = expires_at.signed_duration_since(now); // Negative or zero ⇒ already stale, treat as fresh_for=0 so the // worker will revalidate immediately on its next tick. - out.fresh_for = Some(if delta.num_seconds() > 0 { - Duration::from_secs(delta.num_seconds() as u64) + Some(if delta.num_seconds() > 0 { + Duration::from_secs(delta.num_seconds().cast_unsigned()) } else { Duration::ZERO - }); - } + }) + } else { + None + }; // no-cache forces revalidation regardless of any max-age value. - if out.no_cache { - out.fresh_for = Some(Duration::ZERO); + if no_cache { + fresh_for = Some(Duration::ZERO); } - out + Self { + etag, + last_modified, + fresh_for, + no_cache, + no_store, + } } - /// True if any conditional-request validator (ETag or Last-Modified) is + /// True if any conditional-request validator (`ETag` or `Last-Modified`) is /// present. Only used by tests today; production code calls /// [`HttpClient::get_bytes_conditional`] which inspects the fields directly. #[cfg(test)] @@ -102,7 +105,7 @@ impl CacheValidators { } } -/// Parse a `Cache-Control` header value into (max_age, no_cache, no_store). +/// Parse a `Cache-Control` header value into (`max_age`, `no_cache`, `no_store`). /// Tolerant: malformed directives are skipped. fn parse_cache_control(value: &str) -> (Option, bool, bool) { let mut max_age: Option = None; diff --git a/jans-cedarling/cedarling/src/http/mod.rs b/jans-cedarling/cedarling/src/http/mod.rs index 71dd3a480cd..3e9d501d1eb 100644 --- a/jans-cedarling/cedarling/src/http/mod.rs +++ b/jans-cedarling/cedarling/src/http/mod.rs @@ -280,6 +280,7 @@ pub(crate) type HttpClientError = HttpRequestError; #[cfg(all(test, not(target_arch = "wasm32")))] mod test { + use crate::http::cache_headers::CacheValidators; use crate::http::{HttpClient, HttpClientConfig}; use mockito::Server; @@ -507,7 +508,7 @@ mod test { let url = format!("{}/store", server.url()); let result = client - .get_bytes_conditional(&url, &Default::default()) + .get_bytes_conditional(&url, &CacheValidators::default()) .await .expect("request"); @@ -520,7 +521,7 @@ mod test { Some(std::time::Duration::from_secs(120)) ); } - other => panic!("expected Modified, got {other:?}"), + super::ConditionalFetch::NotModified => panic!("expected Modified, got NotModified"), } mock.assert_async().await; } diff --git a/jans-cedarling/cedarling/src/init/policy_store_refresh.rs b/jans-cedarling/cedarling/src/init/policy_store_refresh.rs index 7fde6a40ddb..a2c7a6893e4 100644 --- a/jans-cedarling/cedarling/src/init/policy_store_refresh.rs +++ b/jans-cedarling/cedarling/src/init/policy_store_refresh.rs @@ -131,22 +131,13 @@ pub(crate) enum RebuildError { } /// Mutable per-source state — what we learned from the previous response. +#[derive(Default)] struct RefreshState { validators: CacheValidators, last_body_hash: Option, consecutive_failures: u32, } -impl Default for RefreshState { - fn default() -> Self { - Self { - validators: CacheValidators::default(), - last_body_hash: None, - consecutive_failures: 0, - } - } -} - impl RefreshState { /// Returns the delay before the next refresh attempt. Starts from /// `base_secs`; a server `Cache-Control: max-age` / `Expires` hint may @@ -178,9 +169,12 @@ impl RefreshState { let secs = secs.max(min_secs); let jitter_pct = jitter_pct(); - let adjusted = (secs as i128) + ((secs as i128) * jitter_pct / 100); - let adjusted = adjusted.max(min_secs as i128) as u64; - Duration::from_secs(adjusted) + let secs_i128 = i128::from(secs); + let adjusted = secs_i128 + (secs_i128 * jitter_pct / 100); + let adjusted = adjusted.max(i128::from(min_secs)); + // Adjusted is bounded by `[min_secs, secs * 110/100]`, both of which fit + // in u64 since `secs` is itself a u64. + Duration::from_secs(u64::try_from(adjusted).unwrap_or(min_secs)) } } @@ -189,7 +183,7 @@ impl RefreshState { fn jitter_pct() -> i128 { static COUNTER: AtomicU64 = AtomicU64::new(0); let n = COUNTER.fetch_add(1, Ordering::Relaxed); - ((n % 21) as i128) - 10 + i128::from(n % 21) - 10 } fn body_hash(bytes: &[u8]) -> u64 { @@ -309,7 +303,7 @@ async fn run_worker( futures::pin_mut!(sleep_fut); match select(sleep_fut, &mut shutdown_rx).await { - Either::Left((_, _)) => { + Either::Left(((), _)) => { let outcome = tick( &source, &http_client, @@ -453,26 +447,36 @@ mod tests { let s = RefreshState::default(); let d = s.next_delay(300).as_secs(); // Allow for ±10% jitter. - assert!(d >= 270 && d <= 330, "got {d}"); + assert!((270..=330).contains(&d), "got {d}"); } #[test] fn next_delay_honors_shorter_server_hint() { - let mut s = RefreshState::default(); - s.validators.fresh_for = Some(Duration::from_secs(60)); + let s = RefreshState { + validators: CacheValidators { + fresh_for: Some(Duration::from_secs(60)), + ..CacheValidators::default() + }, + ..RefreshState::default() + }; let d = s.next_delay(300).as_secs(); // Server says 60s — we pick the shorter of 60 and 300, plus jitter. - assert!(d >= 54 && d <= 66, "got {d}"); + assert!((54..=66).contains(&d), "got {d}"); } #[test] fn next_delay_caps_server_hint_at_base() { - let mut s = RefreshState::default(); - // Server says "fresh for an hour" but operator chose 30s. - s.validators.fresh_for = Some(Duration::from_secs(3600)); + let s = RefreshState { + // Server says "fresh for an hour" but operator chose 30s. + validators: CacheValidators { + fresh_for: Some(Duration::from_secs(3600)), + ..CacheValidators::default() + }, + ..RefreshState::default() + }; let d = s.next_delay(30).as_secs(); // Should track the operator's 30s base, ±10% jitter — never the server's 3600s. - assert!(d >= 27 && d <= 33, "got {d}"); + assert!((27..=33).contains(&d), "got {d}"); } #[test] @@ -485,12 +489,14 @@ mod tests { #[test] fn next_delay_backs_off_on_failures() { - let mut s = RefreshState::default(); - s.consecutive_failures = 3; + let s = RefreshState { + consecutive_failures: 3, + ..RefreshState::default() + }; let d = s.next_delay(60).as_secs(); // 60 * 2^3 = 480, plus jitter, capped at REFRESH_FAILURE_BACKOFF_MAX_SECS=600. // After ±10% jitter, expect between ~432 and ~528. - assert!(d >= 400 && d <= 600, "got {d}"); + assert!((400..=600).contains(&d), "got {d}"); } #[test] diff --git a/jans-cedarling/cedarling/src/lib.rs b/jans-cedarling/cedarling/src/lib.rs index 41f7edea8a9..ca77ab5df3e 100644 --- a/jans-cedarling/cedarling/src/lib.rs +++ b/jans-cedarling/cedarling/src/lib.rs @@ -227,36 +227,14 @@ impl Cedarling { let authz = service_factory.authz_service().await?; let authz_swap = Arc::new(arc_swap::ArcSwap::from(authz)); - // Spawn the policy-store refresh worker if the source is remote and a - // non-zero refresh interval was configured. - let refresh_handle = if config.policy_store_config.refresh_enabled() { - if let Some(source) = - RefreshSource::from_policy_store_source(&config.policy_store_config.source) - { - let rebuilder = AuthzRebuilder { - jwt_config: config.jwt_config.clone(), - authorization_config: config.authorization_config.clone(), - http_client: service_factory.http_client_for_refresh(), - log: log.clone(), - data_store: data.clone(), - metrics: metrics.clone(), - }; - let handle = spawn_refresh_worker( - source, - config.policy_store_config.refresh_interval_secs, - service_factory.http_client_for_refresh(), - rebuilder, - authz_swap.clone(), - metrics.clone(), - log.clone(), - ); - Some(Arc::new(handle)) - } else { - None - } - } else { - None - }; + let refresh_handle = maybe_spawn_refresh_worker( + config, + &service_factory, + authz_swap.clone(), + log.clone(), + data.clone(), + metrics.clone(), + ); Ok(Cedarling { log, @@ -354,6 +332,41 @@ impl TrustedIssuerLoadingInfo for Cedarling { } } +/// Spawn the background policy-store refresh worker if the source is a remote +/// URL and a non-zero refresh interval was configured. Returns `None` for +/// local sources or when refresh is disabled. +fn maybe_spawn_refresh_worker( + config: &BootstrapConfig, + service_factory: &ServiceFactory<'_>, + authz_swap: Arc>, + log: log::Logger, + data: Arc, + metrics: Arc, +) -> Option> { + if !config.policy_store_config.refresh_enabled() { + return None; + } + let source = RefreshSource::from_policy_store_source(&config.policy_store_config.source)?; + let rebuilder = AuthzRebuilder { + jwt_config: config.jwt_config.clone(), + authorization_config: config.authorization_config.clone(), + http_client: service_factory.http_client_for_refresh(), + log: log.clone(), + data_store: data, + metrics: metrics.clone(), + }; + let handle = spawn_refresh_worker( + source, + config.policy_store_config.refresh_interval_secs, + service_factory.http_client_for_refresh(), + rebuilder, + authz_swap, + metrics, + log, + ); + Some(Arc::new(handle)) +} + /// Log detailed information about the loaded policy store metadata, including /// ID, version, description, Cedar version, timestamps, and compatibility with /// the runtime Cedar version.