diff --git a/AGENTS.md b/AGENTS.md index 56827a3..02ac35f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -122,7 +122,7 @@ tests/ 10. **Route-group auth guard** - Next.js `(dashboard)/layout.tsx` wraps all protected pages 11. **Mode-aware auth** - `none`/`bearer`/`credentials`/`mtls` with role checks on protected endpoints -See `docs/adr/` for all 19 Architecture Decision Records. +See `docs/adr/` for all 22 Architecture Decision Records. ## API Endpoints @@ -258,8 +258,8 @@ Completed: - Nginx reverse proxy (single-origin deployment) - ErrorBoundary wrapping all dashboard pages - Cross-entity navigation (command palette → detail pages, event↔mitigation linking, audit log → mitigations, clickable stat cards) -- Multi-signal correlation engine with signal groups, Alertmanager/FastNetMon adapters, a generic JSONPath-driven webhook adapter (ADR 020), and corroborating-only signals from coarse telemetry (ADR 021) -- 21 Architecture Decision Records +- Multi-signal correlation engine with signal groups, Alertmanager/FastNetMon adapters, a generic JSONPath-driven webhook adapter (ADR 020), corroborating-only signals from coarse telemetry (ADR 021), and exponential confidence decay over time (ADR 022) +- 22 Architecture Decision Records - CLI tool (prefixdctl) for all API operations - OpenAPI spec with utoipa annotations - 179 backend unit tests + 99 integration + 16 postgres tests (+ 17 ignored requiring GoBGP/Docker) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c54e43..60582b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.18.0] - 2026-05-11 + +### Added + +- **Confidence decay for signal groups (ADR 022).** Exponential time decay applied to per-event confidence contributions when computing `derived_confidence`. Each event's effective weight is multiplied by `0.5 ^ (age_seconds / half_life_seconds)`, so older corroborating evidence smoothly loses influence without ever being discarded. Default is disabled (`confidence_decay_half_life_seconds: 0`); zero behavior change for existing deployments. + - **Global config:** `correlation.confidence_decay_half_life_seconds: u32` (default 0). Validated to `0 ≤ H ≤ 10 × window_seconds`. + - **Per-playbook override:** `correlation_override.confidence_decay_half_life_seconds: Option`. `Some(0)` explicitly disables decay for the playbook; `None` falls through to global. + - **Reconcile loop step.** New `refresh_decayed_confidence` step iterates every open signal group each tick (default 30 s) and recomputes `derived_confidence` from current events with decay applied. No-op when decay is disabled. + - **One-shot `corroboration_met`.** When `derived_confidence` falls below the threshold due to decay, `corroboration_met` is now sticky (`met_now || was_met`) — once a group has authorized mitigation, decay never revokes that authorization for the group's lifetime. + - **Metric:** `prefixd_signal_group_decay_refreshes_total` counter ticks once per `refresh_decayed_confidence` invocation. Alert on "decay loop not running" by watching for the counter going flat. + - **UI:** Group detail page surfaces "decayed, half-life Ns" next to `derived_confidence` when decay is active for the group's effective playbook. + - 17 new tests (7 engine unit + 7 config unit + 3 integration) covering decay math, override resolution, validation, disabled paths, and one-shot stickiness. + ## [0.17.1] - 2026-05-11 ### Changed diff --git a/ROADMAP.md b/ROADMAP.md index b6cbe6f..7ac2ff4 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -301,7 +301,7 @@ Example: FastNetMon says UDP flood at 0.6 confidence + router CPU spiking + host ### Confidence Model - [x] Derived confidence from traffic patterns -- [ ] Confidence decay over time +- [x] Confidence decay over time - [x] Per-playbook thresholds --- diff --git a/docs/adr/022-confidence-decay.md b/docs/adr/022-confidence-decay.md new file mode 100644 index 0000000..243a750 --- /dev/null +++ b/docs/adr/022-confidence-decay.md @@ -0,0 +1,201 @@ +# ADR 022: Confidence Decay for Signal Groups + +**Status:** Accepted +**Date:** 2026-05-11 +**Extends:** [ADR 018 — Multi-Signal Correlation Engine](018-multi-signal-correlation-engine.md), [ADR 021 — Corroborating Signals](021-corroborating-signals.md) + +## Context + +The correlation engine (ADR 018) computes a signal group's +`derived_confidence` as the source-weighted average of per-event +confidence values: + +``` +derived = Σ(confidence_i · weight_i) / Σ(weight_i) +``` + +Once a group's `derived_confidence` clears the configured +`confidence_threshold` and `min_sources` is satisfied, +`corroboration_met` is flipped to `true` and the group is allowed to +trigger mitigations (or, in the ADR 021 corroborating-only flow, +strengthen open groups). + +This works well during an active attack — fresh telemetry keeps arriving +and the weighted average reflects current reality. It is less honest +once an incident winds down: + +1. **Long correlation windows hold stale evidence.** Operators routinely + configure `window_seconds: 3600` to absorb burst-and-recover patterns. + A high-confidence event ingested 50 minutes ago still contributes to + the average at full weight, even though everything since has been + benign. +2. **Corroborating sources from ADR 021 amplify the problem.** A + `mode: corroborating` source that fired hours ago continues to inflate + `derived_confidence` long after its operational signal has gone + silent. +3. **Operators cannot express "fresh evidence matters more"** without + abandoning windowed correlation entirely. + +The result: groups whose underlying attack has already abated continue +to read as "highly corroborated" for the remainder of the window. Any +ADR-021 corroborator that fires in that window — even on totally +unrelated telemetry — sees a green light from the cached confidence and +nudges the group toward (re-)mitigation. + +A naive fix ("drop events older than X seconds from the average") loses +useful history and produces step-function discontinuities in the score. + +## Decision + +Introduce **exponential confidence decay** on the +weighted-average computation. Each event's contribution is multiplied by +`0.5 ^ (age_seconds / half_life_seconds)` before being summed, so older +events smoothly lose weight without ever being discarded outright: + +``` +weight_eff_i = weight_i · 0.5 ^ (age_i / H) +derived = Σ(confidence_i · weight_eff_i) / Σ(weight_eff_i) +``` + +Where: + +- `age_i = now - ingested_at_i` (clamped to ≥ 0) +- `H = effective_decay_half_life_seconds` (resolved per-playbook, see below) +- `H = 0` disables decay (default; preserves ADR 018 behavior) + +### Configuration + +A new global field on `CorrelationConfig`: + +```yaml +correlation: + enabled: true + window_seconds: 3600 + min_sources: 2 + confidence_threshold: 0.7 + confidence_decay_half_life_seconds: 300 # 5-minute half-life +``` + +Per-playbook override on `PlaybookCorrelationOverride`: + +```yaml +playbooks: + - vector: udp_flood + correlation_override: + confidence_decay_half_life_seconds: 60 # faster decay for noisy vector + - vector: dns_amplification + correlation_override: + confidence_decay_half_life_seconds: 0 # explicitly disable for this playbook +``` + +Override resolution (`effective_decay_half_life()`): + +- `Some(0)` ⇒ decay explicitly disabled for this playbook +- `Some(n)` ⇒ use `n` +- `None` ⇒ fall through to global `confidence_decay_half_life_seconds` + +Validation: `0 ≤ H ≤ 10 × window_seconds`. The upper bound prevents +configuration mistakes where a half-life longer than the correlation +window would render decay effectively a no-op. + +### Compute Paths + +Two recompute paths use the decayed variant: + +1. **`POST /v1/events` ingestion.** Every event that lands in an open + group recomputes `derived_confidence` with decay applied. +2. **Reconcile loop (every tick, 30 s).** A new + `refresh_decayed_confidence` step iterates every open signal group + (`list_open_signal_groups`) and recomputes `derived_confidence` from + the current event set. This is what actually delivers the decay to + groups that aren't receiving fresh events. + +The reconcile step is a no-op when `confidence_decay_half_life_seconds` +is 0 (so users not opting in pay no extra DB cost). + +### One-Shot Corroboration (Sticky `corroboration_met`) + +When `derived_confidence` falls below `confidence_threshold` due to +decay, `corroboration_met` **must not** flap back to `false`. The flag +is sticky once set: + +```rust +corroboration_met = met_now || was_met +``` + +This preserves the operational invariant that "once mitigation was +authorized for this group, it stays authorized for the lifetime of the +group" — decay only shapes future authorizations on *other* groups, +never revokes one already granted. + +### Observability + +- **Metric:** `prefixd_signal_group_decay_refreshes_total` counter, + ticks once per `refresh_decayed_confidence` invocation (whether or not + any groups were refreshed). Lets operators alert on "decay loop not + running". +- **UI:** The group detail page surfaces "decayed, half-life Ns" next to + the `derived_confidence` value when decay is active for the group's + effective playbook, so operators can interpret the score correctly. + +## Consequences + +### Positive + +- Stale corroboration evidence loses weight smoothly without + discontinuities. +- ADR-021 corroborating sources from earlier in the window no longer + hold groups at artificially high confidence. +- Per-playbook tuning lets operators dial decay speed per vector (e.g. + faster decay for noisy UDP floods, slower for slow-and-low credential + stuffing). +- Sticky `corroboration_met` prevents flap-back of authorized + mitigations even under aggressive decay configs. +- Defaults to disabled (`H = 0`) — zero behavior change for existing + deployments. + +### Negative + +- Reconcile loop now does O(open_groups · events_per_group) DB reads + per tick when decay is enabled. For typical deployments (< 100 open + groups, < 10 events each) this is negligible, but pathological + configurations would notice. +- `derived_confidence` is no longer a pure function of "events on the + group" — it now also depends on wall-clock time. This complicates + reproducing a group's score offline; the trade-off is acceptable + given the operational gain. +- Decay does not change `source_count` (still a raw distinct-source + count). Operators relying on `source_count` for thresholding will not + see decay affect their gate; only `confidence_threshold` benefits. + +### Single-Event Math Note + +For a group with exactly one event, `derived_confidence` is unaffected +by decay (the decay factor cancels in `Σ(c·w_eff) / Σ(w_eff)`). Decay +only meaningfully shifts the score when a group has events at different +ages. This is mathematically correct and matches operator intuition: +"one piece of evidence is one piece of evidence, regardless of how old". + +## Alternatives Considered + +1. **Hard cutoff (drop events older than X).** Rejected: step-function + discontinuities in score, and loses useful history for slow-attack + detection. +2. **Linear decay.** Considered. Rejected in favor of exponential + because half-life is the unit operators reason about intuitively + ("after 5 minutes, evidence is worth half what it was") and matches + industry convention for time-decayed metrics. +3. **Per-source decay rates.** Considered. Deferred: introduces another + tuning knob whose value isn't obvious to operators and overlaps with + per-source `weight`. Global + per-playbook covers the immediate + need. +4. **Decay only on corroborating signals.** Considered. Rejected + because primary detectors also produce stale evidence (a firing + Prometheus alert that has been resolved for 40 minutes shouldn't + keep contributing at full weight either). + +## Migration + +No migration required. Default `confidence_decay_half_life_seconds: 0` +preserves ADR 018 behavior bit-for-bit. Operators opt in by setting a +non-zero value in `correlation.yaml`. diff --git a/docs/adr/README.md b/docs/adr/README.md index ddc1880..5ae0755 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -29,5 +29,6 @@ Format follows [Michael Nygard's template](https://cognitect.com/blog/2011/11/15 | [019](019-signal-adapter-architecture.md) | Signal Adapter Architecture | Accepted | 2026-03-19 | | [020](020-generic-webhook-adapter.md) | Generic Webhook Adapter | Accepted | 2026-04-18 | | [021](021-corroborating-signals.md) | Corroborating Signals | Accepted | 2026-04-19 | +| [022](022-confidence-decay.md) | Confidence Decay for Signal Groups | Accepted | 2026-05-11 | ADRs are numbered sequentially as written. Retroactive ADRs (009-013) were documented on 2026-02-18 but dated to when the decision was originally made. diff --git a/docs/api.md b/docs/api.md index 4e3db85..6c8eb74 100644 --- a/docs/api.md +++ b/docs/api.md @@ -546,7 +546,7 @@ GET /v1/signal-groups/{id} Authorization: Bearer ``` -Returns group metadata and all contributing events with source, confidence, and source weight. +Returns group metadata and all contributing events with source, confidence, and source weight. When `correlation.confidence_decay_half_life_seconds` (or the playbook override) is non-zero, `derived_confidence` is computed with exponential time decay applied — older events contribute proportionally less weight. See [ADR 022](adr/022-confidence-decay.md). The `corroboration_met` flag is sticky: once set to `true` it never reverts to `false`, even if decay drives `derived_confidence` back under the threshold. **Response:** diff --git a/docs/configuration.md b/docs/configuration.md index 211085a..1b65f6c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -284,6 +284,13 @@ correlation: # Global minimum derived confidence threshold (0.0-1.0). # A signal group must reach this threshold (in addition to min_sources) before triggering. confidence_threshold: 0.5 + + # Exponential half-life applied to per-event confidence contributions when + # computing derived_confidence. 0 disables decay (default). When set, an + # event's effective weight is multiplied by 0.5^(age_seconds / H), so + # older corroborating evidence smoothly loses influence. See ADR 022. + # Must satisfy 0 <= H <= 10 * window_seconds. + confidence_decay_half_life_seconds: 0 # Default weight for sources not listed below default_weight: 1.0 @@ -307,6 +314,7 @@ correlation: | `window_seconds` | integer | `300` | Time window for grouping signals (seconds) | | `min_sources` | integer | `1` | Minimum distinct sources to trigger mitigation | | `confidence_threshold` | float | `0.5` | Minimum derived confidence to trigger | +| `confidence_decay_half_life_seconds` | integer | `0` | Exponential half-life (seconds) for time-decaying per-event confidence contributions; 0 disables decay. Bounded by `10 × window_seconds`. See [ADR 022](adr/022-confidence-decay.md). | | `default_weight` | float | `1.0` | Weight for unknown/unconfigured sources | | `sources` | map | `{}` | Per-source weight and type configuration | @@ -367,6 +375,7 @@ playbooks: correlation: min_sources: 2 # Require corroboration for UDP floods confidence_threshold: 0.7 + confidence_decay_half_life_seconds: 60 # Faster decay for noisy vector steps: - action: police rate_bps: 5000000 @@ -379,6 +388,7 @@ When a playbook has no `correlation` override, the global defaults from `prefixd |----------------|------|-------------| | `min_sources` | integer | Override global min_sources for this playbook | | `confidence_threshold` | float | Override global confidence_threshold for this playbook | +| `confidence_decay_half_life_seconds` | integer (optional) | Override global half-life. `0` explicitly disables decay for this playbook even when the global is non-zero; omit to inherit the global value. | #### Hot Reload diff --git a/frontend/app/(dashboard)/correlation/groups/[id]/page.tsx b/frontend/app/(dashboard)/correlation/groups/[id]/page.tsx index 4297392..e1272bd 100644 --- a/frontend/app/(dashboard)/correlation/groups/[id]/page.tsx +++ b/frontend/app/(dashboard)/correlation/groups/[id]/page.tsx @@ -449,6 +449,14 @@ export default function SignalGroupDetailPage({

Derived Confidence + {correlationConfig && + correlationConfig.confidence_decay_half_life_seconds && + correlationConfig.confidence_decay_half_life_seconds > 0 ? ( + + (decayed, half-life{" "} + {correlationConfig.confidence_decay_half_life_seconds}s) + + ) : null}

diff --git a/frontend/lib/api.ts b/frontend/lib/api.ts index 4accd96..7c1d2bb 100644 --- a/frontend/lib/api.ts +++ b/frontend/lib/api.ts @@ -808,6 +808,7 @@ export interface CorrelationConfig { min_sources: number confidence_threshold: number default_weight: number + confidence_decay_half_life_seconds?: number sources: Record webhook_adapters?: WebhookAdapter[] } diff --git a/src/api/handlers.rs b/src/api/handlers.rs index a3e667b..4226edb 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -834,11 +834,16 @@ async fn handle_ban( .await .map_err(AppError)?; - let confidence_pairs: Vec<(Option, f32)> = group_events + let confidence_triples: Vec = group_events .iter() - .map(|ge| (ge.confidence, ge.source_weight)) + .map(|ge| (ge.confidence, ge.source_weight, ge.ingested_at)) .collect(); - let derived_confidence = CorrelationEngine::compute_derived_confidence(&confidence_pairs); + let half_life = correlation_config.effective_decay_half_life(playbook_override); + let derived_confidence = CorrelationEngine::compute_derived_confidence_decayed( + &confidence_triples, + chrono::Utc::now(), + half_life, + ); let source_names: Vec = group_events .iter() @@ -5553,11 +5558,6 @@ async fn recompute_group_aggregates(state: &Arc, group_id: Uuid) -> Re .await .map_err(AppError)?; - let pairs: Vec<(Option, f32)> = events - .iter() - .map(|e| (e.confidence, e.source_weight)) - .collect(); - let derived = CorrelationEngine::compute_derived_confidence(&pairs); let sources: Vec = events.iter().filter_map(|e| e.source.clone()).collect(); let count = CorrelationEngine::count_distinct_sources(&sources); @@ -5570,14 +5570,26 @@ async fn recompute_group_aggregates(state: &Arc, group_id: Uuid) -> Re // corroboration_met → true on the corroborator path. let was_met = group.corroboration_met; let mut newly_met = false; + let correlation_config = state.correlation_config.read().await.clone(); + let playbooks = state.playbooks.read().await.clone(); + let resolved_playbook = group + .playbook_name + .as_deref() + .and_then(|name| playbooks.playbooks.iter().find(|p| p.name == name)); + let playbook_override_for_decay = resolved_playbook.and_then(|p| p.correlation.as_ref()); + let half_life = correlation_config.effective_decay_half_life(playbook_override_for_decay); + let triples: Vec = events + .iter() + .map(|e| (e.confidence, e.source_weight, e.ingested_at)) + .collect(); + let derived = CorrelationEngine::compute_derived_confidence_decayed( + &triples, + chrono::Utc::now(), + half_life, + ); + let new_met = if has_primary { - let correlation_config = state.correlation_config.read().await.clone(); - let playbooks = state.playbooks.read().await.clone(); - let resolved_playbook = group - .playbook_name - .as_deref() - .and_then(|name| playbooks.playbooks.iter().find(|p| p.name == name)); - let override_ = resolved_playbook.and_then(|p| p.correlation.as_ref()); + let override_ = playbook_override_for_decay; match (resolved_playbook, group.playbook_name.as_deref()) { (Some(_), _) => { let met = CorrelationEngine::check_corroboration_with_primary( @@ -5590,7 +5602,11 @@ async fn recompute_group_aggregates(state: &Arc, group_id: Uuid) -> Re if met && !was_met { newly_met = true; } - met + // ADR 022 one-shot semantics: corroboration_met is sticky + // once true. Decay can lower derived_confidence below the + // threshold but must not undo a mitigation decision that + // already triggered. + met || was_met } (None, Some(missing)) => { tracing::debug!( @@ -5607,8 +5623,9 @@ async fn recompute_group_aggregates(state: &Arc, group_id: Uuid) -> Re } } } else { - // No primary event yet → invariant: corroboration cannot be met. - false + // No primary event yet → invariant: corroboration cannot be met + // unless it was already true (sticky). + was_met }; if newly_met { diff --git a/src/config/playbooks.rs b/src/config/playbooks.rs index 806eed6..d3a5529 100644 --- a/src/config/playbooks.rs +++ b/src/config/playbooks.rs @@ -443,6 +443,7 @@ playbooks: correlation: Some(PlaybookCorrelationOverride { min_sources: Some(2), confidence_threshold: Some(0.8), + ..Default::default() }), steps: vec![PlaybookStep { action: PlaybookAction::Police, diff --git a/src/correlation/config.rs b/src/correlation/config.rs index 542b1a7..4f8cf77 100644 --- a/src/correlation/config.rs +++ b/src/correlation/config.rs @@ -39,6 +39,14 @@ pub struct CorrelationConfig { #[serde(default = "default_weight")] pub default_weight: f32, + /// Exponential half-life (in seconds) applied to the contribution of each + /// event's confidence to the group's `derived_confidence`. An event of + /// age `t` contributes `weight * 0.5^(t / half_life_seconds)`. A value of + /// `0` (the default) disables decay entirely — equivalent to v0.17.x + /// behavior. See ADR 022. + #[serde(default)] + pub confidence_decay_half_life_seconds: u32, + /// Generic webhook adapters, each exposed at `POST /v1/signals/webhook/{name}`. /// See `docs/configuration.md` for the schema. #[serde(default, skip_serializing_if = "Vec::is_empty")] @@ -126,7 +134,7 @@ impl Default for SourceConfig { /// Per-playbook correlation override. When present on a playbook, these values /// override the global `min_sources` and `confidence_threshold` for events /// matching that playbook. -#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +#[derive(Debug, Clone, Default, Serialize, Deserialize, utoipa::ToSchema)] pub struct PlaybookCorrelationOverride { /// Override for the minimum number of distinct sources. #[serde(default)] @@ -135,6 +143,13 @@ pub struct PlaybookCorrelationOverride { /// Override for the minimum derived confidence threshold. #[serde(default)] pub confidence_threshold: Option, + + /// Override for the exponential half-life applied to event confidence + /// contributions. `Some(0)` explicitly disables decay for this + /// playbook even when the global value is non-zero; `None` falls + /// through to the global setting. See ADR 022. + #[serde(default)] + pub confidence_decay_half_life_seconds: Option, } impl Default for CorrelationConfig { @@ -146,6 +161,7 @@ impl Default for CorrelationConfig { confidence_threshold: default_confidence_threshold(), sources: HashMap::new(), default_weight: default_weight(), + confidence_decay_half_life_seconds: 0, webhook_adapters: Vec::new(), } } @@ -215,6 +231,17 @@ impl CorrelationConfig { .unwrap_or(self.confidence_threshold) } + /// Resolve the effective confidence-decay half-life (seconds), using a + /// per-playbook override if provided. `0` means decay disabled. + pub fn effective_decay_half_life( + &self, + playbook_override: Option<&PlaybookCorrelationOverride>, + ) -> u32 { + playbook_override + .and_then(|o| o.confidence_decay_half_life_seconds) + .unwrap_or(self.confidence_decay_half_life_seconds) + } + /// Resolve confidence for a given source and action type using the per-source /// `confidence_mapping`. Falls back to `default_confidence_mapping` if no /// source-specific mapping is configured. @@ -320,6 +347,20 @@ impl CorrelationConfig { errors.push("default_weight must be >= 0.0".to_string()); } + // Sanity bound: a half-life shorter than 1s or much larger than the + // correlation window is almost certainly a config error. We allow + // up to 10x the window so operators can keep long-lived correlation + // groups with slower decay without tripping validation. + if self.confidence_decay_half_life_seconds > 0 { + if self.confidence_decay_half_life_seconds > self.window_seconds.saturating_mul(10) { + errors.push(format!( + "confidence_decay_half_life_seconds ({}) must be <= 10 * window_seconds ({})", + self.confidence_decay_half_life_seconds, + self.window_seconds.saturating_mul(10) + )); + } + } + for (name, source) in &self.sources { if source.weight < 0.0 { errors.push(format!("source '{}': weight must be >= 0.0", name)); @@ -471,6 +512,7 @@ impl CorrelationConfig { "min_sources": self.min_sources, "confidence_threshold": self.confidence_threshold, "default_weight": self.default_weight, + "confidence_decay_half_life_seconds": self.confidence_decay_half_life_seconds, "sources": sources, "webhook_adapters": webhook_adapters, }) @@ -605,7 +647,7 @@ sources: }; let override_ = PlaybookCorrelationOverride { min_sources: Some(3), - confidence_threshold: None, + ..Default::default() }; assert_eq!(config.effective_min_sources(Some(&override_)), 3); } @@ -618,7 +660,7 @@ sources: }; let override_ = PlaybookCorrelationOverride { min_sources: None, - confidence_threshold: None, + ..Default::default() }; assert_eq!(config.effective_min_sources(Some(&override_)), 2); } @@ -639,8 +681,8 @@ sources: ..Default::default() }; let override_ = PlaybookCorrelationOverride { - min_sources: None, confidence_threshold: Some(0.8), + ..Default::default() }; assert_eq!(config.effective_confidence_threshold(Some(&override_)), 0.8); } @@ -900,6 +942,91 @@ sources: assert_eq!(redacted["sources"]["fastnetmon"]["type"], "detector"); } + #[test] + fn test_decay_default_is_disabled() { + let cfg = CorrelationConfig::default(); + assert_eq!(cfg.confidence_decay_half_life_seconds, 0); + assert_eq!(cfg.effective_decay_half_life(None), 0); + } + + #[test] + fn test_decay_global_used_when_no_override() { + let cfg = CorrelationConfig { + confidence_decay_half_life_seconds: 90, + ..Default::default() + }; + assert_eq!(cfg.effective_decay_half_life(None), 90); + } + + #[test] + fn test_decay_playbook_override_takes_precedence() { + let cfg = CorrelationConfig { + confidence_decay_half_life_seconds: 90, + ..Default::default() + }; + let override_ = PlaybookCorrelationOverride { + confidence_decay_half_life_seconds: Some(15), + ..Default::default() + }; + assert_eq!(cfg.effective_decay_half_life(Some(&override_)), 15); + } + + #[test] + fn test_decay_playbook_override_can_disable() { + // Some(0) explicitly disables even when global is non-zero. + let cfg = CorrelationConfig { + confidence_decay_half_life_seconds: 90, + ..Default::default() + }; + let override_ = PlaybookCorrelationOverride { + confidence_decay_half_life_seconds: Some(0), + ..Default::default() + }; + assert_eq!(cfg.effective_decay_half_life(Some(&override_)), 0); + } + + #[test] + fn test_decay_validation_rejects_excessive_half_life() { + // half_life > 10 * window is flagged as a likely misconfiguration. + let cfg = CorrelationConfig { + window_seconds: 60, + confidence_decay_half_life_seconds: 700, + ..Default::default() + }; + let errs = cfg.validate(); + assert!( + errs.iter() + .any(|e| e.contains("confidence_decay_half_life_seconds")), + "expected decay validation error, got: {:?}", + errs + ); + } + + #[test] + fn test_decay_validation_accepts_within_bound() { + let cfg = CorrelationConfig { + window_seconds: 60, + confidence_decay_half_life_seconds: 600, + ..Default::default() + }; + let errs = cfg.validate(); + assert!( + errs.is_empty(), + "expected no errors at exact 10x bound, got: {:?}", + errs + ); + } + + #[test] + fn test_decay_appears_in_redacted_view() { + let cfg = CorrelationConfig { + confidence_decay_half_life_seconds: 120, + ..Default::default() + }; + let r = cfg.redacted(); + assert_eq!(r["confidence_decay_half_life_seconds"], 120); + } + #[test] fn test_save_and_load_roundtrip() { let dir = tempfile::tempdir().unwrap(); diff --git a/src/correlation/engine.rs b/src/correlation/engine.rs index a893b5f..68d2e85 100644 --- a/src/correlation/engine.rs +++ b/src/correlation/engine.rs @@ -4,6 +4,10 @@ use uuid::Uuid; use super::config::{CorrelationConfig, MatchDimension, PlaybookCorrelationOverride}; +/// A confidence-weight-age triple used by the decay-aware compute path. +/// (confidence, source_weight, ingested_at) +pub type ConfidenceTriple = (Option, f32, Option>); + /// Represents a signal group — a collection of related attack events grouped /// by (victim_ip, vector) within a time window. #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] @@ -311,6 +315,65 @@ impl CorrelationEngine { (sum_weighted / sum_weights) as f32 } + /// Decay-aware variant of [`compute_derived_confidence`]. Each event's + /// source weight is multiplied by an exponential factor of + /// `0.5^(age_seconds / half_life_seconds)` before participating in the + /// weighted average. + /// + /// Semantics: + /// - `half_life_seconds == 0` short-circuits to the original + /// weighted average (`events_with_age` is mapped to `(confidence, weight)` + /// pairs and decay is skipped). This keeps the v0.17.x behavior + /// bit-identical when decay is disabled. + /// - Events whose `ingested_at` is `None` (e.g. older rows that + /// pre-date denormalization) are treated as age=0, i.e. they + /// contribute at full weight. This avoids destabilizing existing + /// groups on upgrade. + /// - Future-dated events (clock skew) are clamped to age=0. + /// + /// See ADR 022. + pub fn compute_derived_confidence_decayed( + events_with_age: &[ConfidenceTriple], + now: DateTime, + half_life_seconds: u32, + ) -> f32 { + if events_with_age.is_empty() { + return 0.0; + } + + if half_life_seconds == 0 { + let pairs: Vec<(Option, f32)> = + events_with_age.iter().map(|(c, w, _)| (*c, *w)).collect(); + return Self::compute_derived_confidence(&pairs); + } + + let half_life = half_life_seconds as f64; + let mut sum_weighted = 0.0f64; + let mut sum_weights = 0.0f64; + + for &(confidence, weight, ingested_at) in events_with_age { + let conf = confidence.unwrap_or(0.0) as f64; + let base_weight = weight as f64; + let age_secs = match ingested_at { + Some(t) => { + let dt = (now - t).num_milliseconds() as f64 / 1000.0; + dt.max(0.0) + } + None => 0.0, + }; + let decay = (0.5f64).powf(age_secs / half_life); + let effective_weight = base_weight * decay; + sum_weighted += conf * effective_weight; + sum_weights += effective_weight; + } + + if sum_weights == 0.0 { + return 0.0; + } + + (sum_weighted / sum_weights) as f32 + } + /// Count distinct sources from a list of source names. pub fn count_distinct_sources(sources: &[String]) -> i32 { let mut seen = std::collections::HashSet::new(); @@ -556,6 +619,89 @@ mod tests { assert!((CorrelationEngine::compute_derived_confidence(&e3) - 0.6).abs() < 0.001); } + // ── Confidence decay (ADR 022) ───────────────────────────────────── + + #[test] + fn test_decay_disabled_matches_undecayed() { + // half_life=0 ⇒ identical result to the unadorned helper, ignoring + // ingested_at entirely. + let now = Utc::now(); + let events = vec![ + (Some(0.9), 2.0, Some(now - chrono::Duration::seconds(300))), + (Some(0.3), 1.0, Some(now - chrono::Duration::seconds(900))), + ]; + let decayed = CorrelationEngine::compute_derived_confidence_decayed(&events, now, 0); + let pairs = vec![(Some(0.9), 2.0), (Some(0.3), 1.0)]; + let plain = CorrelationEngine::compute_derived_confidence(&pairs); + assert!((decayed - plain).abs() < 1e-6); + } + + #[test] + fn test_decay_single_event_at_half_life_halves_weight() { + // One event at age=half_life ⇒ effective weight halves but it's + // the only event, so derived confidence is unchanged. + let now = Utc::now(); + let events = vec![(Some(0.8), 1.0, Some(now - chrono::Duration::seconds(60)))]; + let decayed = CorrelationEngine::compute_derived_confidence_decayed(&events, now, 60); + assert!((decayed - 0.8).abs() < 1e-3); + } + + #[test] + fn test_decay_clamps_negative_age() { + // Future-dated ingested_at (clock skew) is treated as age=0. + let now = Utc::now(); + let events = vec![(Some(0.6), 1.0, Some(now + chrono::Duration::seconds(120)))]; + let decayed = CorrelationEngine::compute_derived_confidence_decayed(&events, now, 60); + assert!((decayed - 0.6).abs() < 1e-3); + } + + #[test] + fn test_decay_none_ingested_at_full_weight() { + // Rows that pre-date denormalization (ingested_at=None) contribute + // at full weight rather than being silently dropped. + let now = Utc::now(); + let events = vec![(Some(1.0), 1.0, None)]; + let decayed = CorrelationEngine::compute_derived_confidence_decayed(&events, now, 60); + assert!((decayed - 1.0).abs() < 1e-3); + } + + #[test] + fn test_decay_two_events_fresh_dominates_stale() { + // Fresh confidence=0.9, weight=1.0, age=0 ⇒ effective weight = 1.0 + // Stale confidence=0.1, weight=1.0, age=2*HL ⇒ effective weight = 0.25 + // Expected: (0.9*1.0 + 0.1*0.25) / (1.0 + 0.25) = 0.925 / 1.25 = 0.74 + let now = Utc::now(); + let events = vec![ + (Some(0.9), 1.0, Some(now)), + (Some(0.1), 1.0, Some(now - chrono::Duration::seconds(120))), + ]; + let decayed = CorrelationEngine::compute_derived_confidence_decayed(&events, now, 60); + assert!((decayed - 0.74).abs() < 1e-3); + } + + #[test] + fn test_decay_empty_events_returns_zero() { + let now = Utc::now(); + let events: Vec = vec![]; + let decayed = CorrelationEngine::compute_derived_confidence_decayed(&events, now, 60); + assert_eq!(decayed, 0.0); + } + + #[test] + fn test_decay_extreme_age_does_not_panic() { + // Very old event with reasonable half-life ⇒ contribution rounds to ~0 + // but math is finite and non-NaN. + let now = Utc::now(); + let events = vec![( + Some(0.7), + 1.0, + Some(now - chrono::Duration::seconds(10_000_000)), + )]; + let decayed = CorrelationEngine::compute_derived_confidence_decayed(&events, now, 60); + assert!(decayed.is_finite()); + assert!((0.0..=1.0).contains(&decayed)); + } + // ── Distinct source counting ─────────────────────────────────────── #[test] @@ -625,7 +771,7 @@ mod tests { }; let override_ = PlaybookCorrelationOverride { min_sources: Some(3), - confidence_threshold: None, + ..Default::default() }; // 2 sources meets global (2), but override requires 3 assert!(!CorrelationEngine::check_corroboration( @@ -651,8 +797,8 @@ mod tests { ..Default::default() }; let override_ = PlaybookCorrelationOverride { - min_sources: None, confidence_threshold: Some(0.8), + ..Default::default() }; // 0.6 meets global (0.5) but not override (0.8) assert!(!CorrelationEngine::check_corroboration( diff --git a/src/db/mock.rs b/src/db/mock.rs index c721263..eaec34f 100644 --- a/src/db/mock.rs +++ b/src/db/mock.rs @@ -681,6 +681,15 @@ impl RepositoryTrait for MockRepository { .collect()) } + async fn list_open_signal_groups(&self) -> Result> { + let groups = self.signal_groups.lock().unwrap(); + Ok(groups + .iter() + .filter(|g| g.status == SignalGroupStatus::Open) + .cloned() + .collect()) + } + async fn find_open_groups_by_dimensions( &self, vector: &Option, diff --git a/src/db/repository.rs b/src/db/repository.rs index 0051acc..a119bd6 100644 --- a/src/db/repository.rs +++ b/src/db/repository.rs @@ -1147,6 +1147,21 @@ impl RepositoryTrait for Repository { Ok(rows.into_iter().map(Into::into).collect()) } + async fn list_open_signal_groups(&self) -> Result> { + let rows: Vec = sqlx::query_as( + r#" + SELECT group_id, victim_ip, vector, created_at, window_expires_at, + derived_confidence, source_count, status, corroboration_met, + primary_dimensions, playbook_name + FROM signal_groups + WHERE status = 'open' + "#, + ) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(Into::into).collect()) + } + async fn find_open_groups_by_dimensions( &self, vector: &Option, diff --git a/src/db/traits.rs b/src/db/traits.rs index 1b64a0e..d8e7915 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -204,6 +204,11 @@ pub trait RepositoryTrait: Send + Sync { async fn count_open_groups(&self) -> Result; /// Find open signal groups whose window has expired (for expiry sweep). async fn find_expired_signal_groups(&self) -> Result>; + /// List currently open signal groups (status = 'open'). Used by the + /// reconcile loop's confidence-decay refresh path. No pagination — + /// callers should expect O(open_groups) results which in practice is + /// bounded by the corroboration window size. + async fn list_open_signal_groups(&self) -> Result>; /// Find the mitigation ID linked to a specific signal group (if any). async fn find_mitigation_id_by_signal_group( &self, diff --git a/src/main.rs b/src/main.rs index a4b73a4..c568b02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,7 +104,8 @@ async fn main() -> anyhow::Result<()> { state.is_dry_run(), ) .with_ws_broadcast(state.ws_broadcast.clone()) - .with_alerting(state.alerting.clone()); + .with_alerting(state.alerting.clone()) + .with_app_state(state.clone()); let shutdown_rx = state.subscribe_shutdown(); tokio::spawn(async move { diff --git a/src/observability/metrics.rs b/src/observability/metrics.rs index 44c24e1..c0f647b 100644 --- a/src/observability/metrics.rs +++ b/src/observability/metrics.rs @@ -1,7 +1,7 @@ use once_cell::sync::Lazy; use prometheus::{ - CounterVec, Encoder, GaugeVec, Histogram, HistogramVec, TextEncoder, register_counter_vec, - register_gauge_vec, register_histogram, register_histogram_vec, + Counter, CounterVec, Encoder, GaugeVec, Histogram, HistogramVec, TextEncoder, register_counter, + register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec, }; // Event metrics @@ -287,6 +287,17 @@ pub static CORROBORATOR_CACHE_SIZE: Lazy = Lazy::new(|| { .unwrap() }); +/// Counter of confidence-decay refresh sweeps performed by the +/// reconciliation loop (one sample per tick, regardless of how many groups +/// were refreshed). Useful as a heartbeat for ADR 022 plumbing. +pub static SIGNAL_GROUP_DECAY_REFRESHES_TOTAL: Lazy = Lazy::new(|| { + register_counter!( + "prefixd_signal_group_decay_refreshes_total", + "Number of reconcile-loop ticks that refreshed decayed confidence for open signal groups" + ) + .unwrap() +}); + /// Generate Prometheus metrics output pub fn gather_metrics() -> String { let encoder = TextEncoder::new(); @@ -329,6 +340,7 @@ pub fn init_metrics() { Lazy::force(&CORROBORATOR_ATTACHED_TOTAL); Lazy::force(&CORROBORATOR_EXPIRED_TOTAL); Lazy::force(&CORROBORATOR_CACHE_SIZE); + Lazy::force(&SIGNAL_GROUP_DECAY_REFRESHES_TOTAL); } /// Update database pool metrics from sqlx pool stats diff --git a/src/scheduler/reconcile.rs b/src/scheduler/reconcile.rs index 6609070..a79c299 100644 --- a/src/scheduler/reconcile.rs +++ b/src/scheduler/reconcile.rs @@ -4,9 +4,10 @@ use tokio::sync::broadcast; use crate::alerting::AlertingService; use crate::bgp::FlowSpecAnnouncer; -use crate::correlation::SignalGroupStatus; +use crate::correlation::{CorrelationEngine, SignalGroupStatus}; use crate::db::RepositoryTrait; use crate::domain::{FlowSpecAction, FlowSpecNlri, FlowSpecRule, MitigationStatus}; +use crate::state::AppState; use crate::ws::WsMessage; use tokio::sync::RwLock; @@ -17,6 +18,11 @@ pub struct ReconciliationLoop { dry_run: bool, ws_broadcast: Option>, alerting: Option>>>, + /// Shared application state — used by the ADR 022 confidence-decay + /// refresh path to read the current correlation config + playbooks + /// on each tick (so hot-reloads propagate without restart). Optional + /// so test harnesses can construct the loop standalone. + state: Option>, /// Set of source labels we last set on /// `CORROBORATOR_CACHE_SIZE`. Used to zero-out gauges when a /// source's cache drains to empty between ticks (Prometheus would @@ -38,10 +44,20 @@ impl ReconciliationLoop { dry_run, ws_broadcast: None, alerting: None, + state: None, last_cache_sources: tokio::sync::Mutex::new(std::collections::HashSet::new()), } } + /// Wire the shared `AppState` for paths that need live access to the + /// correlation config or playbooks (currently: ADR 022 confidence + /// decay refresh). Without this, decay refresh is a no-op and + /// `derived_confidence` is only updated on event ingest. + pub fn with_app_state(mut self, state: Arc) -> Self { + self.state = Some(state); + self + } + /// Set the WebSocket broadcast sender for real-time notifications pub fn with_ws_broadcast(mut self, sender: broadcast::Sender) -> Self { self.ws_broadcast = Some(sender); @@ -114,6 +130,10 @@ impl ReconciliationLoop { // 2b. Sweep expired corroborating signals from the floating cache (ADR 021) self.sweep_corroborator_cache().await?; + // 2c. Refresh decayed confidence on open signal groups (ADR 022). + // No-op when confidence decay is disabled or wiring isn't present. + self.refresh_decayed_confidence().await?; + // 3. Sync desired vs actual state self.sync_announcements().await?; @@ -254,6 +274,74 @@ impl ReconciliationLoop { Ok(()) } + /// ADR 022: re-compute and persist `derived_confidence` on every open + /// signal group using the configured exponential half-life. Skipped + /// when correlation config / playbooks aren't wired (e.g. in tests) + /// or when decay is disabled (`half_life_seconds == 0`). + /// + /// Enforces one-shot `corroboration_met` semantics: a group whose + /// flag is already true is never flipped back to false by decay, + /// even if its decayed confidence drops below threshold. Decay only + /// affects the stored value. + async fn refresh_decayed_confidence(&self) -> anyhow::Result<()> { + let Some(state) = self.state.as_ref() else { + return Ok(()); + }; + + let cfg = state.correlation_config.read().await.clone(); + if cfg.confidence_decay_half_life_seconds == 0 { + return Ok(()); + } + + let groups = self.repo.list_open_signal_groups().await?; + if groups.is_empty() { + crate::observability::metrics::SIGNAL_GROUP_DECAY_REFRESHES_TOTAL.inc(); + return Ok(()); + } + + let playbooks = state.playbooks.read().await.clone(); + let now = chrono::Utc::now(); + + for group in groups { + let events = self.repo.list_signal_group_events(group.group_id).await?; + if events.is_empty() { + continue; + } + + let resolved_playbook = group + .playbook_name + .as_deref() + .and_then(|name| playbooks.playbooks.iter().find(|p| p.name == name)); + let override_ = resolved_playbook.and_then(|p| p.correlation.as_ref()); + let half_life = cfg.effective_decay_half_life(override_); + if half_life == 0 { + continue; + } + + let triples: Vec = events + .iter() + .map(|e| (e.confidence, e.source_weight, e.ingested_at)) + .collect(); + let new_derived = + CorrelationEngine::compute_derived_confidence_decayed(&triples, now, half_life); + + // Skip rewrite when nothing meaningful changed (avoids + // churning the row + WAL on idle groups). + if (new_derived - group.derived_confidence).abs() < 0.0005 { + continue; + } + + let mut updated = group; + updated.derived_confidence = new_derived; + // corroboration_met is sticky once true (ADR 022); the field + // is left as-is. source_count is unaffected by decay. + self.repo.update_signal_group(&updated).await?; + } + + crate::observability::metrics::SIGNAL_GROUP_DECAY_REFRESHES_TOTAL.inc(); + Ok(()) + } + async fn sync_announcements(&self) -> anyhow::Result<()> { // Page through all active mitigations using cursor pagination let mut active = Vec::new(); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 9bef2bf..b26f235 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -488,7 +488,7 @@ impl E2ETestContext { confidence_threshold, sources, default_weight: 1.0, - webhook_adapters: Vec::new(), + ..Default::default() }; let state = AppState::new( diff --git a/tests/integration.rs b/tests/integration.rs index 81e83ae..938a4de 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1914,7 +1914,7 @@ fn test_settings_with_correlation( m }, default_weight: 1.0, - webhook_adapters: Vec::new(), + ..Default::default() }; settings } @@ -5707,6 +5707,7 @@ async fn test_late_corroborator_finalizes_with_playbook_override() { correlation: Some(PlaybookCorrelationOverride { min_sources: Some(2), confidence_threshold: Some(0.5), + ..Default::default() }), steps: vec![PlaybookStep { action: PlaybookAction::Police, @@ -5931,6 +5932,324 @@ async fn test_late_corroborator_skips_when_playbook_name_is_stale() { ); } +// ── ADR 022: confidence decay over time ───────────────────────────────── + +#[tokio::test] +async fn test_decay_refresh_lowers_derived_confidence() { + // With two events of different ages and confidences, exponential decay + // shifts the weighted average toward the fresher event. We seed: + // - Aged event: confidence=0.9, weight=1.0, age=~3 half-lives (180s) + // - Fresh event: confidence=0.1, weight=1.0, age=~0 + // Without decay: weighted avg = (0.9 + 0.1) / 2 = 0.5 + // With decay (HL=60): aged effective weight = 1.0 * 0.5^3 = 0.125 + // ⇒ (0.9*0.125 + 0.1*1.0) / (0.125 + 1.0) = 0.2125 / 1.125 ≈ 0.189 + use chrono::{Duration, Utc}; + use prefixd::bgp::FlowSpecAnnouncer; + use prefixd::correlation::engine::{PrimaryDimensions, SignalGroup, SignalGroupStatus}; + use prefixd::scheduler::ReconciliationLoop; + use uuid::Uuid; + + let repo: Arc = Arc::new(MockRepository::new()); + let announcer: Arc = Arc::new(MockAnnouncer::new()); + + let mut settings = test_settings_with_correlation(true, 1, 0.0); + settings.correlation.window_seconds = 3600; + settings.correlation.confidence_decay_half_life_seconds = 60; + + let now = Utc::now(); + let aged_ingested_at = now - Duration::seconds(180); // 3 half-lives + + // Aged event: high confidence but old + let aged_id = Uuid::new_v4(); + repo.insert_event(&prefixd::domain::AttackEvent { + event_id: aged_id, + external_event_id: None, + source: "detector_a".to_string(), + victim_ip: "203.0.113.10".to_string(), + event_timestamp: aged_ingested_at, + ingested_at: aged_ingested_at, + vector: "udp_flood".to_string(), + protocol: None, + bps: None, + pps: None, + top_dst_ports_json: "[]".to_string(), + confidence: Some(0.9), + action: "ban".to_string(), + raw_details: None, + }) + .await + .unwrap(); + + // Fresh event: low confidence but recent + let fresh_id = Uuid::new_v4(); + repo.insert_event(&prefixd::domain::AttackEvent { + event_id: fresh_id, + external_event_id: None, + source: "detector_b".to_string(), + victim_ip: "203.0.113.10".to_string(), + event_timestamp: now, + ingested_at: now, + vector: "udp_flood".to_string(), + protocol: None, + bps: None, + pps: None, + top_dst_ports_json: "[]".to_string(), + confidence: Some(0.1), + action: "ban".to_string(), + raw_details: None, + }) + .await + .unwrap(); + + let group_id = Uuid::new_v4(); + repo.insert_signal_group(&SignalGroup { + group_id, + victim_ip: "203.0.113.10".to_string(), + vector: "udp_flood".to_string(), + created_at: aged_ingested_at, + window_expires_at: now + Duration::seconds(1800), + derived_confidence: 0.5, // pre-decay weighted average + source_count: 2, + status: SignalGroupStatus::Open, + corroboration_met: false, + primary_dimensions: PrimaryDimensions::default(), + playbook_name: None, + }) + .await + .unwrap(); + repo.add_event_to_group(group_id, aged_id, 1.0) + .await + .unwrap(); + repo.add_event_to_group(group_id, fresh_id, 1.0) + .await + .unwrap(); + + let state = AppState::new( + settings, + test_inventory(), + test_playbooks(), + repo.clone(), + announcer.clone(), + std::path::PathBuf::from("."), + ) + .expect("state"); + + let reconciler = ReconciliationLoop::new(repo.clone(), announcer.clone(), 30, true) + .with_app_state(state.clone()); + reconciler.reconcile().await.expect("reconcile"); + + let after = repo.get_signal_group(group_id).await.unwrap().unwrap(); + // Decay should shift average toward the fresh low-confidence event. + // Expected ≈ 0.189, definitely < 0.5 (the undecayed average) + assert!( + after.derived_confidence < 0.35, + "expected decayed confidence < 0.35, got {}", + after.derived_confidence + ); + assert!( + after.derived_confidence > 0.1, + "should not drop to floor of fresh event alone, got {}", + after.derived_confidence + ); + assert_eq!( + after.source_count, 2, + "source_count unaffected by decay (ADR 022)" + ); +} + +#[tokio::test] +async fn test_decay_one_shot_corroboration_no_flap_back() { + // corroboration_met=true is sticky: once set, decay-induced drops in + // derived_confidence below the configured threshold must NOT flip + // the flag back to false. We seed two events (one aged, one fresh) + // so decay actually shifts the weighted average. + use chrono::{Duration, Utc}; + use prefixd::bgp::FlowSpecAnnouncer; + use prefixd::correlation::engine::{PrimaryDimensions, SignalGroup, SignalGroupStatus}; + use prefixd::scheduler::ReconciliationLoop; + use uuid::Uuid; + + let repo: Arc = Arc::new(MockRepository::new()); + let announcer: Arc = Arc::new(MockAnnouncer::new()); + + let mut settings = test_settings_with_correlation(true, 1, 0.5); + settings.correlation.window_seconds = 3600; + settings.correlation.confidence_decay_half_life_seconds = 60; + + let now = Utc::now(); + let aged_ingested_at = now - Duration::seconds(300); // 5 half-lives + + // Aged event (conf=0.1, weight=1.0) + let aged_id = Uuid::new_v4(); + repo.insert_event(&prefixd::domain::AttackEvent { + event_id: aged_id, + external_event_id: None, + source: "detector_a".to_string(), + victim_ip: "203.0.113.10".to_string(), + event_timestamp: aged_ingested_at, + ingested_at: aged_ingested_at, + vector: "udp_flood".to_string(), + protocol: None, + bps: None, + pps: None, + top_dst_ports_json: "[]".to_string(), + confidence: Some(0.1), + action: "ban".to_string(), + raw_details: None, + }) + .await + .unwrap(); + + // Fresh event (conf=0.1, weight=1.0) + let fresh_id = Uuid::new_v4(); + repo.insert_event(&prefixd::domain::AttackEvent { + event_id: fresh_id, + external_event_id: None, + source: "detector_b".to_string(), + victim_ip: "203.0.113.10".to_string(), + event_timestamp: now, + ingested_at: now, + vector: "udp_flood".to_string(), + protocol: None, + bps: None, + pps: None, + top_dst_ports_json: "[]".to_string(), + confidence: Some(0.1), + action: "ban".to_string(), + raw_details: None, + }) + .await + .unwrap(); + + let group_id = Uuid::new_v4(); + repo.insert_signal_group(&SignalGroup { + group_id, + victim_ip: "203.0.113.10".to_string(), + vector: "udp_flood".to_string(), + created_at: aged_ingested_at, + window_expires_at: now + Duration::seconds(1800), + derived_confidence: 0.7, // was high when corroboration triggered + source_count: 2, + status: SignalGroupStatus::Open, + corroboration_met: true, // already triggered earlier + primary_dimensions: PrimaryDimensions::default(), + playbook_name: None, + }) + .await + .unwrap(); + repo.add_event_to_group(group_id, aged_id, 1.0) + .await + .unwrap(); + repo.add_event_to_group(group_id, fresh_id, 1.0) + .await + .unwrap(); + + let state = AppState::new( + settings, + test_inventory(), + test_playbooks(), + repo.clone(), + announcer.clone(), + std::path::PathBuf::from("."), + ) + .expect("state"); + + let reconciler = ReconciliationLoop::new(repo.clone(), announcer.clone(), 30, true) + .with_app_state(state.clone()); + reconciler.reconcile().await.expect("reconcile"); + + let after = repo.get_signal_group(group_id).await.unwrap().unwrap(); + // Both events have conf=0.1 so decayed average will be ~0.1 < 0.5 threshold + assert!( + after.derived_confidence < 0.5, + "decay should drop confidence below the 0.5 threshold (got {})", + after.derived_confidence + ); + assert!( + after.corroboration_met, + "corroboration_met must stay true even when decay drops confidence below threshold" + ); +} + +#[tokio::test] +async fn test_decay_disabled_does_not_touch_groups() { + // With half_life_seconds=0, refresh_decayed_confidence is a no-op + // even on groups with very old events. + use chrono::{Duration, Utc}; + use prefixd::bgp::FlowSpecAnnouncer; + use prefixd::correlation::engine::{PrimaryDimensions, SignalGroup, SignalGroupStatus}; + use prefixd::scheduler::ReconciliationLoop; + use uuid::Uuid; + + let repo: Arc = Arc::new(MockRepository::new()); + let announcer: Arc = Arc::new(MockAnnouncer::new()); + + let settings = test_settings_with_correlation(true, 1, 0.5); + assert_eq!(settings.correlation.confidence_decay_half_life_seconds, 0); + + let now = Utc::now(); + let event_id = Uuid::new_v4(); + repo.insert_event(&prefixd::domain::AttackEvent { + event_id, + external_event_id: None, + source: "detector_a".to_string(), + victim_ip: "203.0.113.10".to_string(), + event_timestamp: now - Duration::seconds(3600), + ingested_at: now - Duration::seconds(3600), + vector: "udp_flood".to_string(), + protocol: None, + bps: None, + pps: None, + top_dst_ports_json: "[]".to_string(), + confidence: Some(0.85), + action: "ban".to_string(), + raw_details: None, + }) + .await + .unwrap(); + + let group_id = Uuid::new_v4(); + repo.insert_signal_group(&SignalGroup { + group_id, + victim_ip: "203.0.113.10".to_string(), + vector: "udp_flood".to_string(), + created_at: now - Duration::seconds(3600), + window_expires_at: now + Duration::seconds(7200), + derived_confidence: 0.85, + source_count: 1, + status: SignalGroupStatus::Open, + corroboration_met: false, + primary_dimensions: PrimaryDimensions::default(), + playbook_name: None, + }) + .await + .unwrap(); + repo.add_event_to_group(group_id, event_id, 1.0) + .await + .unwrap(); + + let state = AppState::new( + settings, + test_inventory(), + test_playbooks(), + repo.clone(), + announcer.clone(), + std::path::PathBuf::from("."), + ) + .expect("state"); + + let reconciler = ReconciliationLoop::new(repo.clone(), announcer.clone(), 30, true) + .with_app_state(state.clone()); + reconciler.reconcile().await.expect("reconcile"); + + let after = repo.get_signal_group(group_id).await.unwrap().unwrap(); + assert!( + (after.derived_confidence - 0.85).abs() < 1e-3, + "decay disabled ⇒ derived_confidence unchanged; got {}", + after.derived_confidence + ); +} + #[tokio::test] async fn test_cache_listing_endpoint_returns_unattached_signals() { use chrono::{Duration, Utc};