From 7bb6c52db29c5cba12a98946cff565507b567d86 Mon Sep 17 00:00:00 2001 From: Lance Tuller Date: Tue, 12 May 2026 10:28:22 -0400 Subject: [PATCH] feat(correlation): generic webhook adapter transforms Add per-field transforms applied after JSONPath extraction in the generic webhook adapter (ADR 020). Three variants cover the common shapes detectors emit in the wild: - unit_conversion: multiply an extracted numeric value (bps, pps, confidence) by a constant. Useful for Mbps->bps, kpps->pps, % -> ratio. - regex_extract: pull a capture group out of an extracted vector string. Lets operators map free-form alert descriptions onto prefixd vector names without writing a shim. - computed: replace a numeric field's value with scale * product of one or more JSONPath extractions. Lets operators derive fields not present directly in the payload (bps = packets * avg_size * 8). All transforms are validated at config load: regex compiles, multipliers are finite, transform variant matches field type (unit_conversion/computed -> numeric, regex_extract -> string), field name is one of the four whitelisted ones. Misconfigurations fail fast on POST /v1/config/reload rather than silently producing zero events. Confidence pipeline order: JSONPath extract -> transform -> confidence_scale divisor -> clamp to [0, 1]. Adds regex 1.x as a top-level dependency. Tests: 17 new (14 unit + 3 end-to-end integration) covering each variant, compile-time validation, type-mismatch rejection, missing- value passthrough, and clamping interaction with confidence_scale. --- CHANGELOG.md | 14 +- Cargo.lock | 1 + Cargo.toml | 1 + ROADMAP.md | 2 +- docs/configuration.md | 25 ++ src/correlation/config.rs | 1 + src/correlation/mod.rs | 4 +- src/correlation/webhook.rs | 593 +++++++++++++++++++++++++++++++++++-- tests/integration.rs | 131 ++++++++ 9 files changed, 749 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17dd382..bef3682 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.18.1] - 2026-05-11 + +### Added + +- **Generic webhook adapter transforms.** Per-field transforms applied after JSONPath extraction in the generic webhook adapter (ADR 020). Three variants cover the common shapes: + - `unit_conversion` — multiply an extracted numeric value (`bps`, `pps`, `confidence`) by a constant. Useful for `Mbps→bps`, `kpps→pps`, percentage → ratio. + - `regex_extract` — pull a capture group out of an extracted `vector` string. Lets operators map free-form alert descriptions ("Detected: udp_flood at 250Gbps") onto prefixd vector names without writing a shim. + - `computed` — replace a numeric field's value with `scale * Π(extract(path_i))`. Lets operators derive fields not directly present in the payload (e.g. `bps = packets × avg_size × 8`). + - All transforms are validated at config load (regex compiles, multipliers are finite, transform shape matches field type) so misconfigurations fail fast on `POST /v1/config/reload` rather than silently producing zero events. + - 17 new tests (14 unit + 3 end-to-end integration) covering each variant, compile-time validation, type-mismatch rejection, missing-value passthrough, and interaction with the existing `confidence_scale` clamp pipeline. + ## [0.18.0] - 2026-05-11 ### Added @@ -920,7 +931,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Safelist prevents mitigation of protected infrastructure - Guardrails block overly broad mitigations -[Unreleased]: https://github.com/lance0/prefixd/compare/v0.18.0...HEAD +[Unreleased]: https://github.com/lance0/prefixd/compare/v0.18.1...HEAD +[0.18.1]: https://github.com/lance0/prefixd/compare/v0.18.0...v0.18.1 [0.18.0]: https://github.com/lance0/prefixd/compare/v0.17.1...v0.18.0 [0.17.1]: https://github.com/lance0/prefixd/compare/v0.17.0...v0.17.1 [0.17.0]: https://github.com/lance0/prefixd/compare/v0.16.0...v0.17.0 diff --git a/Cargo.lock b/Cargo.lock index bc4a8fb..2bc8ed3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2321,6 +2321,7 @@ dependencies = [ "prost-build", "prost-types", "rand 0.10.1", + "regex", "reqwest", "rustls", "rustls-pemfile", diff --git a/Cargo.toml b/Cargo.toml index d6c5b71..4969617 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ hmac = "0.13" hex = "0.4" subtle = "2" serde_json_path = "0.7" +regex = "1" ipnet = { version = "2", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } async-trait = "0.1" diff --git a/ROADMAP.md b/ROADMAP.md index 19cc287..47def2e 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -280,7 +280,7 @@ Example: FastNetMon says UDP flood at 0.6 confidence + router CPU spiking + host - [x] FastNetMon webhook adapter (`POST /v1/signals/fastnetmon`) — classifies vectors from traffic breakdown, configurable confidence mapping - [x] Generic webhook adapter (`POST /v1/signals/webhook/{name}`) — operator-configured JSONPath mapping, HMAC/bearer/none auth, array batching via root_path (ADR 020) - [ ] Router telemetry adapter (JTI, gNMI) -- [ ] Generic adapter transform functions (unit conversion, regex extract, computed fields) +- [x] Generic adapter transform functions (unit conversion, regex extract, computed fields) ### Correlation Engine diff --git a/docs/configuration.md b/docs/configuration.md index 1b65f6c..8b35ff7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -426,6 +426,20 @@ correlation: default_vector: unknown confidence_scale: 100 # divide extracted value by this source_id_prefix: "radware-" + transforms: # optional per-field transforms + bps: + type: unit_conversion # Mbps -> bps + multiplier: 1000000 + vector: + type: regex_extract # pull "udp_flood" out of free-form text + pattern: "(\\w+)_flood" + group: 0 # 0=whole match, 1+=capture group N + pps: + type: computed # derive pps from packets/second fields + paths: + - "$.metrics.packets" + - "$.metrics.duration_inv" + scale: 1.0 ``` **Adapter schema:** @@ -449,6 +463,17 @@ correlation: | `default_vector` | no | string | Fallback when vector missing or not in map | | `confidence_scale` | no | float | Divisor (e.g. `100` for 0–100 scales) | | `source_id_prefix` | no | string | Prefix prepended to extracted `source_id` | +| `transforms` | no | map | Per-field transforms applied post-extraction (see below) | + +**Field transforms.** Each entry in `transforms` maps a field name to a single transform. Only `bps`, `pps`, `confidence` (numeric) and `vector` (string) accept transforms; the daemon rejects the adapter at load time if a transform is attached to a different field or with the wrong shape. + +| `transforms..type` | Applies to | Other keys | Behavior | +|---|---|---|---| +| `unit_conversion` | `bps`, `pps`, `confidence` | `multiplier` (float, must be finite) | Multiplies the extracted value. Use for `Mbps→bps` (`1_000_000`), `kpps→pps` (`1000`), `%→ratio` (`0.01`). Missing values stay `None`. | +| `regex_extract` | `vector` | `pattern` (string regex), `group` (int, default `0`) | Replaces the extracted string with `captures.get(group).as_str()`. No match ⇒ field is treated as missing and falls through to `vector_map` / `default_vector`. The regex is compiled once at config load. | +| `computed` | `bps`, `pps`, `confidence` | `paths` (array of JSONPath), `scale` (float, default `1.0`) | Bypasses the field's primary `fields.` JSONPath. Evaluates `scale * Π(extract(path_i))`. Any path that resolves to null or non-number ⇒ field is `None`. Useful for fields not directly present in the payload (e.g. derive `bps = packets × avg_size × 8`). | + +For `confidence`, the order of operations is: JSONPath extract → transform → `confidence_scale` divisor → clamp to `[0, 1]`. **Authentication:** diff --git a/src/correlation/config.rs b/src/correlation/config.rs index 4f8cf77..bb0e53f 100644 --- a/src/correlation/config.rs +++ b/src/correlation/config.rs @@ -1082,6 +1082,7 @@ sources: default_vector: None, confidence_scale: None, source_id_prefix: None, + transforms: HashMap::new(), } } diff --git a/src/correlation/mod.rs b/src/correlation/mod.rs index 4b6a0f2..315759a 100644 --- a/src/correlation/mod.rs +++ b/src/correlation/mod.rs @@ -5,6 +5,6 @@ pub mod webhook; pub use config::*; pub use engine::*; pub use webhook::{ - CompiledAdapter, MapError, WebhookAdapter, WebhookAuth, WebhookFieldMap, is_valid_name, - map_payload, verify_hmac_sha256, + CompiledAdapter, CompiledTransform, MapError, WebhookAdapter, WebhookAuth, WebhookFieldMap, + WebhookTransform, is_valid_name, map_payload, verify_hmac_sha256, }; diff --git a/src/correlation/webhook.rs b/src/correlation/webhook.rs index a5a8d59..c82f53d 100644 --- a/src/correlation/webhook.rs +++ b/src/correlation/webhook.rs @@ -16,6 +16,7 @@ use chrono::{DateTime, Utc}; use hmac::{Hmac, KeyInit, Mac}; +use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::Value; use serde_json_path::JsonPath; @@ -70,6 +71,66 @@ pub struct WebhookAdapter { /// Optional prefix prepended to extracted `source_id` values (for dedup). #[serde(default, skip_serializing_if = "Option::is_none")] pub source_id_prefix: Option, + + /// Optional per-field transforms applied after JSONPath extraction. + /// The key is the event field name (`bps`, `pps`, `confidence`, `vector`). + /// Transforms run after `vector_map` and before `confidence_scale`. See + /// [`WebhookTransform`]. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub transforms: HashMap, +} + +/// Operator-defined transform applied to an extracted field value. +/// +/// Three variants cover the common shapes that map cleanly to detector +/// payloads in the wild: +/// +/// - `unit_conversion` — multiply a numeric value by a constant (Mbps→bps, +/// kpps→pps, percentage→ratio, etc.). +/// - `regex_extract` — pull a capture group out of a string (extract a vector +/// name from a free-form alert description). +/// - `computed` — replace the extracted value with the product of one or more +/// JSONPath extractions, scaled by a constant (derive `bps` from packets × +/// packet_size × 8). +/// +/// Each field can have at most one transform; transforms are applied +/// post-JSONPath but pre-validation. For `computed`, the field's primary +/// JSONPath is bypassed in favor of the transform's `paths`. Numeric +/// transforms on null/missing values are a no-op (the field stays `None`). +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WebhookTransform { + /// Multiply the extracted numeric value by `multiplier`. Useful for unit + /// conversion: `Mbps → bps` is `multiplier: 1_000_000`. NaN/infinite + /// multipliers are rejected at compile time. + UnitConversion { multiplier: f64 }, + + /// Apply `pattern` to the extracted string value and replace it with the + /// capture group identified by `group` (defaults to 0 = whole match). If + /// the pattern does not match or the group index is out of range, the + /// field is set to `None` (treated as "missing"). The regex is compiled + /// once at adapter registration. + RegexExtract { + pattern: String, + #[serde(default)] + group: usize, + }, + + /// Bypass the primary JSONPath for this field and instead compute its + /// value as `scale * Π(extract(path_i))` where each path resolves to a + /// numeric node. Any path that resolves to null or a non-number causes + /// the field to be `None`. Useful for fields not directly present in the + /// payload: e.g. derive `bps = packets * avg_size_bytes * 8` from a + /// payload that only carries `packets` and `avg_size_bytes`. + Computed { + paths: Vec, + #[serde(default = "default_scale")] + scale: f64, + }, +} + +fn default_scale() -> f64 { + 1.0 } /// JSONPath expressions for each mapped event field. @@ -154,6 +215,26 @@ pub enum MapError { field: &'static str, expected: &'static str, }, + #[error("invalid regex '{pattern}' in transform for field '{field}': {source}")] + InvalidRegex { + field: String, + pattern: String, + source: regex::Error, + }, + #[error("invalid transform multiplier '{value}' for field '{field}' (must be finite)")] + InvalidMultiplier { field: String, value: f64 }, + #[error( + "transform for field '{0}' is not allowed (only bps, pps, confidence, vector supported)" + )] + UnsupportedTransformField(String), + #[error( + "transform variant '{variant}' is not allowed on field '{field}' (expected a {expected} field)" + )] + TransformTypeMismatch { + field: String, + variant: &'static str, + expected: &'static str, + }, } /// Validate that a webhook adapter name is safe for use as a URL path segment. @@ -183,8 +264,20 @@ pub struct CompiledAdapter { pub source_id: Option, pub top_dst_ports: Option, pub action: Option, + pub transforms: HashMap, } +/// Pre-validated transform, with regex/JSONPath pre-parsed. +pub enum CompiledTransform { + UnitConversion { multiplier: f64 }, + RegexExtract { regex: Regex, group: usize }, + Computed { paths: Vec, scale: f64 }, +} + +const NUMERIC_FIELDS: &[&str] = &["bps", "pps", "confidence"]; +const STRING_FIELDS: &[&str] = &["vector"]; +const ALLOWED_TRANSFORM_FIELDS: &[&str] = &["bps", "pps", "confidence", "vector"]; + impl CompiledAdapter { pub fn compile(adapter: &WebhookAdapter) -> Result { let compile_opt = @@ -219,10 +312,91 @@ impl CompiledAdapter { source_id: compile_opt("source_id", &adapter.fields.source_id)?, top_dst_ports: compile_opt("top_dst_ports", &adapter.fields.top_dst_ports)?, action: compile_opt("action", &adapter.fields.action)?, + transforms: compile_transforms(&adapter.transforms)?, }) } } +fn compile_transforms( + raw: &HashMap, +) -> Result, MapError> { + let mut out = HashMap::with_capacity(raw.len()); + for (field, transform) in raw { + if !ALLOWED_TRANSFORM_FIELDS.contains(&field.as_str()) { + return Err(MapError::UnsupportedTransformField(field.clone())); + } + let compiled = match transform { + WebhookTransform::UnitConversion { multiplier } => { + if !NUMERIC_FIELDS.contains(&field.as_str()) { + return Err(MapError::TransformTypeMismatch { + field: field.clone(), + variant: "unit_conversion", + expected: "numeric (bps, pps, or confidence)", + }); + } + if !multiplier.is_finite() { + return Err(MapError::InvalidMultiplier { + field: field.clone(), + value: *multiplier, + }); + } + CompiledTransform::UnitConversion { + multiplier: *multiplier, + } + } + WebhookTransform::RegexExtract { pattern, group } => { + if !STRING_FIELDS.contains(&field.as_str()) { + return Err(MapError::TransformTypeMismatch { + field: field.clone(), + variant: "regex_extract", + expected: "string (vector)", + }); + } + let regex = Regex::new(pattern).map_err(|source| MapError::InvalidRegex { + field: field.clone(), + pattern: pattern.clone(), + source, + })?; + CompiledTransform::RegexExtract { + regex, + group: *group, + } + } + WebhookTransform::Computed { paths, scale } => { + if !NUMERIC_FIELDS.contains(&field.as_str()) { + return Err(MapError::TransformTypeMismatch { + field: field.clone(), + variant: "computed", + expected: "numeric (bps, pps, or confidence)", + }); + } + if !scale.is_finite() { + return Err(MapError::InvalidMultiplier { + field: field.clone(), + value: *scale, + }); + } + let compiled_paths: Vec = paths + .iter() + .map(|p| { + JsonPath::parse(p).map_err(|source| MapError::InvalidPath { + field: field.clone(), + path: p.clone(), + source, + }) + }) + .collect::>()?; + CompiledTransform::Computed { + paths: compiled_paths, + scale: *scale, + } + } + }; + out.insert(field.clone(), compiled); + } + Ok(out) +} + /// Extract zero or more `AttackEventInput`s from a payload using adapter rules. /// /// If the adapter has a `root_path`, each matching JSON node produces one event. @@ -287,21 +461,23 @@ fn map_one( .as_ref() .and_then(|p| p.query(node).at_most_one().ok().flatten()) .and_then(|v| v.as_str().map(|s| s.to_string())); - resolve_vector(adapter, raw.as_deref()) + let transformed = apply_string_transform(raw, compiled.transforms.get("vector")); + resolve_vector(adapter, transformed.as_deref()) }; - let bps = extract_i64(&compiled.bps, node)?; - let pps = extract_i64(&compiled.pps, node)?; + let bps = extract_numeric(&compiled.bps, node, compiled.transforms.get("bps"))?; + let pps = extract_numeric(&compiled.pps, node, compiled.transforms.get("pps"))?; let confidence_raw = compiled .confidence .as_ref() .and_then(|p| p.query(node).at_most_one().ok().flatten()) - .and_then(|v| v.as_f64()) - .map(|x| x as f32); - let confidence = confidence_raw.map(|c| { - let scaled = adapter.confidence_scale.map(|s| c / s).unwrap_or(c); - scaled.clamp(0.0, 1.0) + .and_then(|v| v.as_f64()); + let confidence_after_transform = + apply_numeric_transform(confidence_raw, node, compiled.transforms.get("confidence")); + let confidence = confidence_after_transform.map(|c| { + let scaled = adapter.confidence_scale.map(|s| c / s as f64).unwrap_or(c); + (scaled as f32).clamp(0.0, 1.0) }); let event_id = compiled @@ -361,17 +537,72 @@ fn map_one( }) } -fn extract_i64(path: &Option, node: &Value) -> Result, MapError> { - let Some(p) = path.as_ref() else { - return Ok(None); - }; - let Some(v) = p.query(node).at_most_one().ok().flatten() else { - return Ok(None); - }; - match v { - Value::Number(n) => Ok(n.as_i64().or_else(|| n.as_f64().map(|f| f as i64))), - Value::Null => Ok(None), - _ => Ok(None), +fn extract_numeric( + path: &Option, + node: &Value, + transform: Option<&CompiledTransform>, +) -> Result, MapError> { + // Computed transforms bypass the primary JSONPath entirely. + if let Some(CompiledTransform::Computed { paths, scale }) = transform { + return Ok(compute_product(paths, *scale, node).map(|f| f as i64)); + } + + let raw_f64 = path + .as_ref() + .and_then(|p| p.query(node).at_most_one().ok().flatten()) + .and_then(|v| match v { + Value::Number(n) => n.as_f64(), + _ => None, + }); + + let after = apply_numeric_transform(raw_f64, node, transform); + Ok(after.map(|f| f as i64)) +} + +fn apply_numeric_transform( + raw: Option, + node: &Value, + transform: Option<&CompiledTransform>, +) -> Option { + match transform { + None => raw, + Some(CompiledTransform::UnitConversion { multiplier }) => raw.map(|v| v * multiplier), + Some(CompiledTransform::Computed { paths, scale }) => compute_product(paths, *scale, node), + // RegexExtract is rejected at compile time for numeric fields. + Some(CompiledTransform::RegexExtract { .. }) => raw, + } +} + +fn apply_string_transform( + raw: Option, + transform: Option<&CompiledTransform>, +) -> Option { + match (raw, transform) { + (raw, None) => raw, + (Some(s), Some(CompiledTransform::RegexExtract { regex, group })) => regex + .captures(&s) + .and_then(|caps| caps.get(*group)) + .map(|m| m.as_str().to_string()), + (None, _) => None, + // UnitConversion / Computed are rejected at compile time for string fields. + (raw, Some(_)) => raw, + } +} + +fn compute_product(paths: &[JsonPath], scale: f64, node: &Value) -> Option { + let mut product = scale; + for p in paths { + let v = p.query(node).at_most_one().ok().flatten()?; + let n = match v { + Value::Number(n) => n.as_f64()?, + _ => return None, + }; + product *= n; + } + if product.is_finite() { + Some(product) + } else { + None } } @@ -445,6 +676,7 @@ mod tests { default_vector: None, confidence_scale: None, source_id_prefix: None, + transforms: HashMap::new(), } } @@ -686,4 +918,327 @@ mod tests { assert!(!verify_hmac_sha256(b"key", b"body", "not-hex!!")); assert!(!verify_hmac_sha256(b"key", b"body", "")); } + + #[test] + fn unit_conversion_scales_bps() { + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "bps".into(), + WebhookTransform::UnitConversion { + multiplier: 1_000_000.0, + }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "metrics": { "bps": 250, "pps": 50 }, + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.bps, Some(250_000_000)); + assert_eq!(event.pps, Some(50)); // untransformed + } + + #[test] + fn unit_conversion_scales_pps_and_confidence() { + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "pps".into(), + WebhookTransform::UnitConversion { multiplier: 1000.0 }, + ); + adapter.transforms.insert( + "confidence".into(), + WebhookTransform::UnitConversion { multiplier: 0.01 }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "metrics": { "bps": 1, "pps": 7 }, + "score": 42, + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.pps, Some(7_000)); + assert_eq!(event.confidence, Some(0.42)); + } + + #[test] + fn unit_conversion_passes_through_missing_values() { + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "bps".into(), + WebhookTransform::UnitConversion { + multiplier: 1_000_000.0, + }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ "target": { "ip": "203.0.113.5" } }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.bps, None); + } + + #[test] + fn regex_extract_pulls_vector_from_description() { + let mut adapter = basic_adapter(); + adapter.fields.vector = Some("$.description".into()); + adapter.transforms.insert( + "vector".into(), + WebhookTransform::RegexExtract { + pattern: r"(\w+)_flood".into(), + group: 0, + }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "description": "Detected: udp_flood at 250Gbps targeting customer-42", + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.vector, AttackVector::UdpFlood); + } + + #[test] + fn regex_extract_uses_named_group() { + let mut adapter = basic_adapter(); + adapter.fields.vector = Some("$.description".into()); + adapter.transforms.insert( + "vector".into(), + WebhookTransform::RegexExtract { + pattern: r"type=(?P\w+_flood)".into(), + group: 1, + }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "description": "alert id=42 type=syn_flood severity=high", + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.vector, AttackVector::SynFlood); + } + + #[test] + fn regex_extract_no_match_falls_back_to_default() { + let mut adapter = basic_adapter(); + adapter.fields.vector = Some("$.description".into()); + adapter.default_vector = Some("unknown".into()); + adapter.transforms.insert( + "vector".into(), + WebhookTransform::RegexExtract { + pattern: r"(\w+)_flood".into(), + group: 0, + }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "description": "benign traffic spike on customer-7", + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.vector, AttackVector::Unknown); + } + + #[test] + fn computed_derives_bps_from_packets_and_size() { + let mut adapter = basic_adapter(); + adapter.fields.bps = None; + adapter.transforms.insert( + "bps".into(), + WebhookTransform::Computed { + paths: vec!["$.packets".into(), "$.avg_size".into()], + scale: 8.0, + }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "packets": 1_000_000, + "avg_size": 512, + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.bps, Some(4_096_000_000)); + } + + #[test] + fn computed_missing_path_yields_none() { + let mut adapter = basic_adapter(); + adapter.fields.bps = None; + adapter.transforms.insert( + "bps".into(), + WebhookTransform::Computed { + paths: vec!["$.packets".into(), "$.missing".into()], + scale: 8.0, + }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "packets": 100, + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + assert_eq!(event.bps, None); + } + + #[test] + fn invalid_regex_rejected_at_compile_time() { + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "vector".into(), + WebhookTransform::RegexExtract { + pattern: "[".into(), + group: 0, + }, + ); + assert!(matches!( + CompiledAdapter::compile(&adapter), + Err(MapError::InvalidRegex { .. }) + )); + } + + #[test] + fn non_finite_multiplier_rejected_at_compile_time() { + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "bps".into(), + WebhookTransform::UnitConversion { + multiplier: f64::NAN, + }, + ); + assert!(matches!( + CompiledAdapter::compile(&adapter), + Err(MapError::InvalidMultiplier { .. }) + )); + } + + #[test] + fn unsupported_transform_field_rejected() { + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "victim_ip".into(), + WebhookTransform::UnitConversion { multiplier: 1.0 }, + ); + assert!(matches!( + CompiledAdapter::compile(&adapter), + Err(MapError::UnsupportedTransformField(_)) + )); + } + + #[test] + fn transform_type_mismatch_rejected() { + // unit_conversion on string field + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "vector".into(), + WebhookTransform::UnitConversion { multiplier: 1.0 }, + ); + assert!(matches!( + CompiledAdapter::compile(&adapter), + Err(MapError::TransformTypeMismatch { .. }) + )); + + // regex_extract on numeric field + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "bps".into(), + WebhookTransform::RegexExtract { + pattern: r"\d+".into(), + group: 0, + }, + ); + assert!(matches!( + CompiledAdapter::compile(&adapter), + Err(MapError::TransformTypeMismatch { .. }) + )); + } + + #[test] + fn unit_conversion_after_confidence_scale_clamps() { + // Operator sets confidence_scale=100 (percentage -> ratio) AND a + // unit_conversion multiplier that would push the value past 1.0. + // Clamping happens after both, so the final value is 1.0. + let mut adapter = basic_adapter(); + adapter.confidence_scale = Some(100.0); + adapter.transforms.insert( + "confidence".into(), + WebhookTransform::UnitConversion { multiplier: 10.0 }, + ); + let compiled = CompiledAdapter::compile(&adapter).unwrap(); + let body = json!({ + "target": { "ip": "203.0.113.5" }, + "score": 50, + }); + let event = map_payload(&adapter, &compiled, &body) + .into_iter() + .next() + .unwrap() + .unwrap(); + // raw=50, transform: *10 = 500, scale: /100 = 5.0, clamp: 1.0 + assert_eq!(event.confidence, Some(1.0)); + } + + #[test] + fn transforms_round_trip_through_yaml() { + let mut adapter = basic_adapter(); + adapter.transforms.insert( + "bps".into(), + WebhookTransform::UnitConversion { + multiplier: 1_000_000.0, + }, + ); + adapter.transforms.insert( + "vector".into(), + WebhookTransform::RegexExtract { + pattern: r"(\w+)_flood".into(), + group: 0, + }, + ); + adapter.transforms.insert( + "pps".into(), + WebhookTransform::Computed { + paths: vec!["$.x".into(), "$.y".into()], + scale: 2.0, + }, + ); + let yaml = serde_yaml::to_string(&adapter).unwrap(); + assert!(yaml.contains("unit_conversion")); + assert!(yaml.contains("regex_extract")); + assert!(yaml.contains("computed")); + let parsed: WebhookAdapter = serde_yaml::from_str(&yaml).unwrap(); + assert_eq!(parsed.transforms.len(), 3); + // Sanity-check one round-tripped value. + match parsed.transforms.get("bps").unwrap() { + WebhookTransform::UnitConversion { multiplier } => assert_eq!(*multiplier, 1e6), + _ => panic!("expected UnitConversion"), + } + } } diff --git a/tests/integration.rs b/tests/integration.rs index 938a4de..8b42dc6 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -4767,6 +4767,7 @@ fn basic_webhook_adapter(name: &str) -> prefixd::correlation::WebhookAdapter { default_vector: None, confidence_scale: None, source_id_prefix: None, + transforms: Default::default(), } } @@ -4982,6 +4983,136 @@ async fn test_webhook_vector_map_and_scaling() { assert_eq!(json["processed"], 1); } +#[tokio::test] +async fn test_webhook_unit_conversion_transform_end_to_end() { + use prefixd::correlation::WebhookTransform; + let mut adapter = basic_webhook_adapter("megabit"); + adapter.transforms.insert( + "bps".into(), + WebhookTransform::UnitConversion { + multiplier: 1_000_000.0, + }, + ); + let app = setup_app_with_webhooks(vec![adapter]).await; + + // Detector reports bps=250 (intended Mbps); transform should yield 250M. + let body = r#"{ + "id":"u1", + "ip":"203.0.113.5", + "vector":"udp_flood", + "bps":250, + "pps":1 + }"#; + let (status, json) = post_webhook(&app, "megabit", body, &[]).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(json["processed"], 1); + // Verify the transform actually reshaped the bps value by inspecting the + // ingested event surfaced via the events API. + let req = axum::http::Request::builder() + .method("GET") + .uri("/v1/events") + .body(axum::body::Body::empty()) + .unwrap(); + let resp = tower::ServiceExt::oneshot(app, req).await.unwrap(); + let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let events: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + let arr = events["events"].as_array().expect("events array"); + let found = arr + .iter() + .find(|e| e["victim_ip"] == "203.0.113.5") + .expect("ingested event"); + assert_eq!(found["bps"], 250_000_000); +} + +#[tokio::test] +async fn test_webhook_regex_extract_transform_end_to_end() { + use prefixd::correlation::WebhookTransform; + let mut adapter = basic_webhook_adapter("regex"); + adapter.fields.vector = Some("$.description".into()); + adapter.default_vector = Some("unknown".into()); + adapter.transforms.insert( + "vector".into(), + WebhookTransform::RegexExtract { + pattern: r"(\w+)_flood".into(), + group: 0, + }, + ); + let app = setup_app_with_webhooks(vec![adapter]).await; + + let body = r#"{ + "id":"r1", + "ip":"203.0.113.6", + "description":"Customer 12 hit by udp_flood at 50Gbps" + }"#; + let (status, json) = post_webhook(&app, "regex", body, &[]).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(json["processed"], 1); + + let req = axum::http::Request::builder() + .method("GET") + .uri("/v1/events") + .body(axum::body::Body::empty()) + .unwrap(); + let resp = tower::ServiceExt::oneshot(app, req).await.unwrap(); + let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let events: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + let arr = events["events"].as_array().expect("events array"); + let found = arr + .iter() + .find(|e| e["victim_ip"] == "203.0.113.6") + .expect("ingested event"); + assert_eq!(found["vector"], "udp_flood"); +} + +#[tokio::test] +async fn test_webhook_computed_transform_end_to_end() { + use prefixd::correlation::WebhookTransform; + let mut adapter = basic_webhook_adapter("computed"); + adapter.fields.bps = None; + adapter.transforms.insert( + "bps".into(), + WebhookTransform::Computed { + paths: vec!["$.packets".into(), "$.avg_size".into()], + scale: 8.0, + }, + ); + let app = setup_app_with_webhooks(vec![adapter]).await; + + let body = r#"{ + "id":"c1", + "ip":"203.0.113.7", + "vector":"udp_flood", + "packets": 500000, + "avg_size": 1024, + "pps": 100 + }"#; + let (status, json) = post_webhook(&app, "computed", body, &[]).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(json["processed"], 1); + + let req = axum::http::Request::builder() + .method("GET") + .uri("/v1/events") + .body(axum::body::Body::empty()) + .unwrap(); + let resp = tower::ServiceExt::oneshot(app, req).await.unwrap(); + let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .unwrap(); + let events: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + let arr = events["events"].as_array().expect("events array"); + let found = arr + .iter() + .find(|e| e["victim_ip"] == "203.0.113.7") + .expect("ingested event"); + // 500_000 * 1024 * 8 = 4_096_000_000 + assert_eq!(found["bps"], 4_096_000_000_i64); +} + // ── Corroborating signals (ADR 021) ───────────────────────────────── async fn setup_app_with_corroborating_source(