diff --git a/Cargo.lock b/Cargo.lock index 7c431b6f44..8e2142c732 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1353,9 +1353,14 @@ checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" name = "datadog-ffe" version = "1.0.0" dependencies = [ + "bincode", "chrono", "derive_more", "faststr", + "http", + "httpmock", + "libdd-capabilities", + "libdd-capabilities-impl", "libdd-common", "libdd-remote-config", "libdd-trace-protobuf", @@ -1370,6 +1375,7 @@ dependencies = [ "serde_json", "serde_with", "thiserror 2.0.17", + "tokio", "url", ] diff --git a/datadog-ffe-test-suite/Cargo.toml b/datadog-ffe-test-suite/Cargo.toml index 3ab4adb0c7..4711f1df29 100644 --- a/datadog-ffe-test-suite/Cargo.toml +++ b/datadog-ffe-test-suite/Cargo.toml @@ -11,7 +11,7 @@ publish = false bench = false [dev-dependencies] -datadog-ffe = { path = "../datadog-ffe" } +datadog-ffe = { path = "../datadog-ffe", features = ["flagevaluation-evp"] } chrono = { version = "0.4.38", default-features = false, features = ["now", "serde"] } criterion = { version = "0.5", features = ["html_reports"] } env_logger = "0.10" diff --git a/datadog-ffe-test-suite/benches/eval.rs b/datadog-ffe-test-suite/benches/eval.rs index ac6b8fb3a7..94cc27e2ba 100644 --- a/datadog-ffe-test-suite/benches/eval.rs +++ b/datadog-ffe-test-suite/benches/eval.rs @@ -1,9 +1,21 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use criterion::{black_box, criterion_group, criterion_main, Bencher, Criterion}; +use criterion::{ + black_box, criterion_group, criterion_main, BatchSize, Bencher, Criterion, Throughput, +}; +use datadog_ffe::telemetry::flagevaluation::{ + encode_flag_evaluation_payloads, AllocationKey, ContextDD, FfeFlagEvaluationBatch, + FfeFlagEvaluationEvent, FlagEvalEventContext, FlagEvaluationEvpCoalescer, FlagKey, VariantKey, + EVP_PAYLOAD_SIZE_LIMIT, +}; +use datadog_ffe::telemetry::FfeTelemetryContext; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, fs, sync::Arc}; +use std::{ + collections::{BTreeMap, HashMap}, + fs, + sync::Arc, +}; use datadog_ffe::rules_based::{ get_assignment, Attribute, Configuration, EvaluationContext, ExpectedFlagType, FlagType, Str, @@ -140,5 +152,142 @@ fn bench_single_flag(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_sdk_test_data, bench_single_flag); +#[derive(Clone, Copy)] +struct FlagEvalBenchProfile { + name: &'static str, + num_flags: usize, + num_users: usize, + num_fields: usize, +} + +const FLAG_EVAL_BENCH_PROFILES: [FlagEvalBenchProfile; 3] = [ + FlagEvalBenchProfile { + name: "typical/100flags_50users_10fields", + num_flags: 100, + num_users: 50, + num_fields: 10, + }, + FlagEvalBenchProfile { + name: "stress/10flags_1000users_250fields", + num_flags: 10, + num_users: 1_000, + num_fields: 250, + }, + FlagEvalBenchProfile { + name: "scale/2500flags_500users_20fields", + num_flags: 2_500, + num_users: 500, + num_fields: 20, + }, +]; + +fn flag_eval_context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "bench-service".to_string(), + env: "ci".to_string(), + version: "1.0.0".to_string(), + } +} + +fn flag_eval_attrs(num_fields: usize) -> String { + let attrs = (0..num_fields) + .map(|i| { + ( + format!("field{i}"), + serde_json::Value::String("value".to_string()), + ) + }) + .collect::>(); + serde_json::to_string(&attrs).expect("benchmark attrs must encode") +} + +fn flag_eval_events(profile: FlagEvalBenchProfile) -> Vec { + let attrs = flag_eval_attrs(profile.num_fields); + let cycle_count = profile.num_flags.max(profile.num_users); + (0..cycle_count) + .map(|i| FfeFlagEvaluationEvent { + timestamp: 1_760_000_000_000, + flag: FlagKey { + key: format!("bench-flag-{}", i % profile.num_flags), + }, + first_evaluation: 1_760_000_000_000 + i as i64, + last_evaluation: 1_760_000_000_000 + i as i64, + evaluation_count: 1, + variant: Some(VariantKey { + key: format!("variant-{}", i % 4), + }), + allocation: Some(AllocationKey { + key: format!("alloc-{}", i % profile.num_flags), + }), + targeting_rule: None, + targeting_key: Some(format!("bench-user-{}", i % profile.num_users)), + context: Some(FlagEvalEventContext { + evaluation: Some(attrs.clone()), + dd: Some(ContextDD { + service: "bench-service".to_string(), + }), + }), + error: None, + runtime_default_used: false, + }) + .collect() +} + +fn flag_eval_batch(profile: FlagEvalBenchProfile) -> FfeFlagEvaluationBatch { + FfeFlagEvaluationBatch { + context: flag_eval_context(), + flag_evaluations: flag_eval_events(profile), + } +} + +fn bench_flagevaluation_evp_coalescer(c: &mut Criterion) { + let mut group = c.benchmark_group("flagevaluation_evp/coalescer"); + for profile in FLAG_EVAL_BENCH_PROFILES { + let batch = flag_eval_batch(profile); + group.throughput(Throughput::Elements(batch.flag_evaluations.len() as u64)); + group.bench_function(profile.name, |b| { + b.iter_batched( + || batch.clone(), + |batch| { + let coalescer = FlagEvaluationEvpCoalescer::::default(); + coalescer.enqueue("agent".to_string(), black_box(batch)); + let batches = coalescer.take_batches(); + coalescer.finish_flush_cycle(); + black_box(batches); + }, + BatchSize::SmallInput, + ) + }); + } + group.finish(); +} + +fn bench_flagevaluation_evp_payloads(c: &mut Criterion) { + let mut group = c.benchmark_group("flagevaluation_evp/payloads"); + for profile in FLAG_EVAL_BENCH_PROFILES { + let batch = flag_eval_batch(profile); + group.throughput(Throughput::Elements(batch.flag_evaluations.len() as u64)); + group.bench_function(profile.name, |b| { + b.iter_batched( + || batch.clone(), + |batch| { + let payloads = + encode_flag_evaluation_payloads(black_box(batch), EVP_PAYLOAD_SIZE_LIMIT) + .expect("benchmark payload should encode"); + black_box(payloads); + }, + BatchSize::SmallInput, + ) + }); + } + group.finish(); +} + +criterion_group!( + benches, + bench_sdk_test_data, + bench_single_flag, + bench_flagevaluation_evp_coalescer, + bench_flagevaluation_evp_payloads +); criterion_main!(benches); diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index 808aca6885..069a493612 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -20,6 +20,9 @@ derive_more = { version = "2.0.0", default-features = false, features = ["from", log = { version = "0.4.21", default-features = false, features = ["kv", "kv_serde"] } md5 = { version = "0.7.0", default-features = false } libdd-common = { version = "5.0.0", path = "../libdd-common", default-features = false, features = ["require-regex-full"] } +libdd-capabilities = { path = "../libdd-capabilities", version = "2.0.0", optional = true } +http = { version = "1.1", optional = true } +tokio = { version = "1.49.0", features = ["time"], optional = true } semver = "1.0" serde-bool = { version = "0.1.3", default-features = false } serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] } @@ -30,9 +33,18 @@ libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true } prost = { version = "0.14.1", optional = true } pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } +[dev-dependencies] +# Matches the sidecar's bincode version. Used by the flagevaluation bincode +# round-trip test, which guards against `skip_serializing_if` reintroducing the +# worker→sidecar IPC field-misalignment bug (bincode is non-self-describing). +bincode = { version = "1.3.3" } +httpmock = "0.8.0-alpha.1" +libdd-capabilities-impl = { path = "../libdd-capabilities-impl" } + [features] default = ["remote-config"] exposure-events = ["dep:lru"] evaluation-metrics = ["dep:libdd-trace-protobuf", "dep:prost"] +flagevaluation-evp = ["dep:http", "dep:libdd-capabilities", "dep:tokio"] pyo3 = ["dep:pyo3"] remote-config = ["dep:libdd-remote-config"] diff --git a/datadog-ffe/src/lib.rs b/datadog-ffe/src/lib.rs index bfa5ea72e6..ddbac1d2e7 100644 --- a/datadog-ffe/src/lib.rs +++ b/datadog-ffe/src/lib.rs @@ -6,7 +6,11 @@ mod flag_type; #[cfg(feature = "remote-config")] mod remote_config; pub mod rules_based; -#[cfg(any(feature = "exposure-events", feature = "evaluation-metrics"))] +#[cfg(any( + feature = "exposure-events", + feature = "evaluation-metrics", + feature = "flagevaluation-evp" +))] pub mod telemetry; pub use flag_type::{ExpectedFlagType, FlagType}; diff --git a/datadog-ffe/src/telemetry/flagevaluation.rs b/datadog-ffe/src/telemetry/flagevaluation.rs new file mode 100644 index 0000000000..01acefefca --- /dev/null +++ b/datadog-ffe/src/telemetry/flagevaluation.rs @@ -0,0 +1,1615 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Reusable EVP flagevaluation payload, coalescing, serialization, and sender +//! primitives for the `flageval-worker` ingestion schema. +//! +//! Two-tier aggregation (full → degraded → drop-counted), context pruning, +//! payload-limit degradation, JSON POST encoding, and Agent EVP proxy sending +//! live here so native FFE consumers can share the same behavior independent of +//! sidecar dispatch. +//! +//! Serialization note (bincode wire vs EVP POST): these types cross the +//! worker→sidecar IPC boundary, which is encoded with **bincode** — a +//! non-self-describing format whose derived `Deserialize` reads every field in +//! declaration order. `#[serde(skip_serializing_if = ...)]` is therefore +//! **incompatible** with the bincode wire: a skipped field is omitted on +//! serialize but still expected on deserialize, causing field misalignment and +//! a dropped batch. For that reason **all fields here are always serialized** +//! (no `skip_serializing_if`). The flageval-worker EVP schema rejects null / +//! empty placeholders (especially for degraded-tier events), so +//! [`encode_flag_evaluation_payloads`] strips null / empty placeholder entries +//! from the JSON before the HTTP POST, reproducing the old skip semantics only +//! on the outbound wire. `#[serde(default)]` is kept on fields that have it for +//! deserialize robustness. + +use super::FfeTelemetryContext; +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, HashMap}; +use std::hash::Hash; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +mod sender; +pub use sender::{ + flagevaluation_agent_proxy_endpoint, send_flag_evaluation_batch, FlagEvaluationEvpSendConfig, + EVP_FLAGEVALUATION_PATH, EVP_PAYLOAD_SIZE_LIMIT, EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE, +}; + +// ── Aggregation caps ──────────────────────────────────────────────────────── +pub const EVAL_SCALE_TARGET_FLAGS: usize = 2_500; +pub const EVAL_SCALE_FULL_BUCKETS_PER_FLAG: usize = 50; +pub const EVAL_SCALE_USERS_PER_FLAG: usize = 1_000; +pub const EVAL_SCALE_PER_FLAG_HEADROOM_MULTIPLIER: usize = 10; +pub const EVAL_SCALE_DEGRADED_BUCKETS_PER_FLAG: usize = 10; +pub const EVAL_SCALE_FULL_BUCKET_TARGET: usize = + EVAL_SCALE_TARGET_FLAGS * EVAL_SCALE_FULL_BUCKETS_PER_FLAG; +pub const EVAL_SCALE_PER_FLAG_BUCKET_TARGET: usize = + EVAL_SCALE_PER_FLAG_HEADROOM_MULTIPLIER * EVAL_SCALE_USERS_PER_FLAG; +pub const EVAL_SCALE_DEGRADED_BUCKET_TARGET: usize = + EVAL_SCALE_TARGET_FLAGS * EVAL_SCALE_DEGRADED_BUCKETS_PER_FLAG; +/// Maximum number of distinct full-tier buckets across all flags. +pub const GLOBAL_CAP: usize = 131_072; +/// Maximum number of full-tier buckets for a single flag. +pub const PER_FLAG_CAP: usize = EVAL_SCALE_PER_FLAG_BUCKET_TARGET; +/// Maximum number of distinct degraded-tier buckets across all flags. +pub const DEGRADED_CAP: usize = 32_768; +/// Maximum number of aggregated rows to include in one EVP POST body. +pub const MAX_EVENTS_PER_POST: usize = 512; + +// ── Context pruning bounds ─────────────────────────────────────────────────── +/// Maximum number of context fields to include in a full-tier event. +pub const MAX_CONTEXT_FIELDS: usize = 256; +/// Maximum byte length of a context field value string. Values exceeding this +/// are skipped entirely (not truncated) to avoid partial-data misattribution. +pub const MAX_FIELD_LENGTH: usize = 256; +/// Maximum nested context path depth accepted from FFI callers. A scalar at +/// `a.b.c.d` is retained; fields nested below that depth are skipped. +pub const MAX_CONTEXT_DEPTH: usize = 4; + +// ── Top-level batch ────────────────────────────────────────────────────────── + +/// Batch wrapper for EVP flagevaluation events. +/// +/// Serializes to: +/// ```json +/// { "context": { "service": "…", "env": "…", "version": "…" }, +/// "flagEvaluations": [ … ] } +/// ``` +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeFlagEvaluationBatch { + pub context: FfeTelemetryContext, + #[serde(rename = "flagEvaluations")] + pub flag_evaluations: Vec, +} + +// ── Per-event payload ──────────────────────────────────────────────────────── + +/// A single aggregated flag evaluation event. +/// +/// **All fields are always serialized** (no `skip_serializing_if`) so the type +/// is safe over the non-self-describing bincode IPC wire (see the module-level +/// serialization note). The degraded tier therefore serializes optional fields +/// as `null`/`false` on the wire; the EVP payload encoder +/// ([`encode_flag_evaluation_payloads`]) strips those null/empty placeholders +/// before the EVP POST so the flageval-worker schema sees no null placeholders. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeFlagEvaluationEvent { + /// Unix timestamp of the aggregation window (milliseconds). + pub timestamp: i64, + /// Required: the flag key. + pub flag: FlagKey, + /// Earliest evaluation in this bucket (milliseconds since epoch). + pub first_evaluation: i64, + /// Latest evaluation in this bucket (milliseconds since epoch). + pub last_evaluation: i64, + /// Number of evaluations folded into this bucket. + pub evaluation_count: u64, + + // Optional fields — present in the full tier, `None` in the degraded tier. + // Serialized as `null` on the bincode wire; the flusher strips them. + /// Variant key; absent when the evaluation returned the runtime default + /// (no variant assigned). + #[serde(default)] + pub variant: Option, + /// Allocation key from the UFC rule that produced this evaluation. + #[serde(default)] + pub allocation: Option, + /// Targeting rule key from UFC metadata. Omit when no real rule metadata exists. + #[serde(default)] + pub targeting_rule: Option, + /// Targeting key identifying the evaluation subject. + #[serde(default)] + pub targeting_key: Option, + /// Pruned evaluation context (≤256 fields, values ≤256 chars, skip-not-truncate). + #[serde(default)] + pub context: Option, + /// Evaluation error, if any. + #[serde(default)] + pub error: Option, + + // Optional field — may appear in either tier. + /// `true` when the evaluation returned the SDK runtime default (absent + /// variant, not a UFC-assigned variant). Serialized as `false` on the wire + /// when unset; the flusher strips the `false` placeholder before the POST. + /// `#[serde(default)]` keeps deserialization robust when the field is absent. + #[serde(default)] + pub runtime_default_used: bool, +} + +// ── Field sub-types ────────────────────────────────────────────────────────── + +/// Holds the flag key for the `flag` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FlagKey { + pub key: String, +} + +/// Holds the variant key for the `variant` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct VariantKey { + pub key: String, +} + +/// Holds the allocation key for the `allocation` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct AllocationKey { + pub key: String, +} + +/// Holds the targeting rule key for the `targeting_rule` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct TargetingRuleKey { + pub key: String, +} + +/// Holds the error message for the `error` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct EvalError { + pub message: String, +} + +/// Per-event context object. +/// +/// `evaluation` carries the pruned context attributes; `dd.service` carries the +/// originating service name for cross-service attribution. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FlagEvalEventContext { + /// Pruned evaluation context attributes (≤256 fields, values ≤256 chars), + /// carried over the wire as a **JSON-object string** (e.g. `{"plan":"premium"}`). + /// + /// The sidecar IPC codec is bincode, which cannot (de)serialize + /// `serde_json::Value` (it relies on `deserialize_any`, which bincode + /// rejects). To keep the bincode wire encodable, the pruned context is + /// stringified at event-build time and re-expanded into a JSON object by the + /// shared EVP encoder ([`encode_flag_evaluation_payloads`]) before the EVP + /// POST, so the on-the-wire EVP schema (`context.evaluation` as an object) + /// is unchanged. `Eq` is preserved because `String` is `Eq`. + /// + /// Always serialized (no `skip_serializing_if`) for bincode-wire safety; + /// the EVP payload encoder strips it when `None` → `null`. + #[serde(default)] + pub evaluation: Option, + /// Datadog-specific context sub-object. Always serialized for bincode-wire + /// safety; the flusher strips it when `None` → `null` (and recursively + /// removes the enclosing `context` object if it becomes empty). + #[serde(default)] + pub dd: Option, +} + +/// Datadog-specific context fields inside the per-event `context.dd` object. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct ContextDD { + /// Originating service name. Always serialized for bincode-wire safety; the + /// flusher strips it when empty (`""`), reproducing the old + /// `skip_serializing_if = "String::is_empty"` semantics on the POST. + #[serde(default)] + pub service: String, +} + +// ── Context pruning ────────────────────────────────────────────────────────── + +/// Prune evaluation context attributes to satisfy the flagevaluation bounds: +/// - At most `MAX_CONTEXT_FIELDS` (256) entries are kept. +/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** (not truncated) to +/// avoid partial-data misattribution. +/// - Non-string values (bool, number, null) are kept regardless of their display length. +/// - Keys are iterated in sorted order for deterministic canonical-key stability; the returned +/// `BTreeMap` preserves that order. +pub fn prune_context( + attrs: &BTreeMap, +) -> BTreeMap { + attrs + .iter() + .filter(|(_, v)| { + // Skip string values that exceed the per-field byte limit. + if let serde_json::Value::String(s) = v { + s.len() <= MAX_FIELD_LENGTH + } else { + true + } + }) + .take(MAX_CONTEXT_FIELDS) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() +} + +#[derive(Default)] +pub struct FlagEvaluationEvpPayloadBuildResult { + pub payloads: Vec, + pub dropped_oversized_rows: u64, + pub degraded_oversized_rows: u64, + pub payload_splits: u64, +} + +#[derive(Default)] +pub struct FlagEvaluationEvpWriterStats { + pub rows_dropped_degraded_cap: u64, + pub rows_dropped_payload_limit: u64, + pub rows_degraded_cardinality_cap: u64, + pub rows_degraded_payload_limit: u64, + pub payload_splits: u64, +} + +#[derive(Default)] +struct FlagEvaluationEvpWriterCounters { + rows_dropped_degraded_cap: AtomicU64, + rows_dropped_payload_limit: AtomicU64, + rows_degraded_cardinality_cap: AtomicU64, + rows_degraded_payload_limit: AtomicU64, + payload_splits: AtomicU64, +} + +impl FlagEvaluationEvpWriterCounters { + fn add_rows_dropped_degraded_cap(&self, count: u64) { + add_counter(&self.rows_dropped_degraded_cap, count); + } + + fn add_rows_dropped_payload_limit(&self, count: u64) { + add_counter(&self.rows_dropped_payload_limit, count); + } + + fn add_rows_degraded_cardinality_cap(&self, count: u64) { + add_counter(&self.rows_degraded_cardinality_cap, count); + } + + fn add_rows_degraded_payload_limit(&self, count: u64) { + add_counter(&self.rows_degraded_payload_limit, count); + } + + fn add_payload_splits(&self, count: u64) { + add_counter(&self.payload_splits, count); + } + + fn collect_writer_stats(&self) -> FlagEvaluationEvpWriterStats { + FlagEvaluationEvpWriterStats { + rows_dropped_degraded_cap: self.rows_dropped_degraded_cap.swap(0, Ordering::Relaxed), + rows_dropped_payload_limit: self.rows_dropped_payload_limit.swap(0, Ordering::Relaxed), + rows_degraded_cardinality_cap: self + .rows_degraded_cardinality_cap + .swap(0, Ordering::Relaxed), + rows_degraded_payload_limit: self + .rows_degraded_payload_limit + .swap(0, Ordering::Relaxed), + payload_splits: self.payload_splits.swap(0, Ordering::Relaxed), + } + } +} + +fn add_counter(counter: &AtomicU64, count: u64) { + if count > 0 { + counter.fetch_add(count, Ordering::Relaxed); + } +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct EventKey { + flag_key: String, + variant_key: Option, + allocation_key: Option, + targeting_rule_key: Option, + targeting_key: Option, + context_evaluation: Option, + context_dd_service: Option, + error_message: Option, + runtime_default_used: bool, +} + +impl EventKey { + fn new(event: &FfeFlagEvaluationEvent) -> Self { + Self { + flag_key: event.flag.key.clone(), + variant_key: event.variant.as_ref().map(|v| v.key.clone()), + allocation_key: event.allocation.as_ref().map(|a| a.key.clone()), + targeting_rule_key: event.targeting_rule.as_ref().map(|r| r.key.clone()), + targeting_key: event.targeting_key.clone(), + context_evaluation: event + .context + .as_ref() + .and_then(|context| context.evaluation.clone()), + context_dd_service: event + .context + .as_ref() + .and_then(|context| context.dd.as_ref().map(|dd| dd.service.clone())), + error_message: event.error.as_ref().map(|e| e.message.clone()), + runtime_default_used: event.runtime_default_used, + } + } + + fn degraded(event: &FfeFlagEvaluationEvent) -> Self { + Self { + flag_key: event.flag.key.clone(), + variant_key: event.variant.as_ref().map(|v| v.key.clone()), + allocation_key: event.allocation.as_ref().map(|a| a.key.clone()), + targeting_rule_key: event.targeting_rule.as_ref().map(|r| r.key.clone()), + targeting_key: None, + context_evaluation: None, + context_dd_service: None, + error_message: event.error.as_ref().map(|e| e.message.clone()), + runtime_default_used: event.runtime_default_used, + } + } +} + +struct PendingDestination { + destination: D, + context: FfeTelemetryContext, + events: HashMap, +} + +struct CoalescerState { + destinations: HashMap>, + flush_running: bool, + pending_bucket_count: usize, + full_bucket_count: usize, + full_bucket_count_by_flag: HashMap, + degraded_bucket_count: usize, + dropped_overflow: u64, +} + +// Keep this manual: deriving `Default` adds an unnecessary `D: Default` bound, +// but the empty state only needs default maps and counters. +impl Default for CoalescerState { + fn default() -> Self { + Self { + destinations: HashMap::new(), + flush_running: false, + pending_bucket_count: 0, + full_bucket_count: 0, + full_bucket_count_by_flag: HashMap::new(), + degraded_bucket_count: 0, + dropped_overflow: 0, + } + } +} + +/// Shared flagevaluation coalescer. +/// +/// The generic destination is owned by the transport adapter. For the sidecar it +/// is the EVP proxy endpoint plus batch context; an agentless sender can use +/// its own endpoint identity without changing aggregation semantics. +pub struct FlagEvaluationEvpCoalescer { + state: Arc>>, + writer_stats: Arc, +} + +impl Clone for FlagEvaluationEvpCoalescer { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + writer_stats: Arc::clone(&self.writer_stats), + } + } +} + +// Keep this manual so callers can use destination key types without `Default`. +impl Default for FlagEvaluationEvpCoalescer { + fn default() -> Self { + Self { + state: Arc::new(Mutex::new(CoalescerState::default())), + writer_stats: Arc::new(FlagEvaluationEvpWriterCounters::default()), + } + } +} + +impl FlagEvaluationEvpCoalescer +where + D: Clone + Eq + Hash, +{ + /// Enqueue a batch and return whether the caller should start a flush loop. + pub fn enqueue(&self, destination: D, batch: FfeFlagEvaluationBatch) -> bool { + if batch.flag_evaluations.is_empty() { + return false; + } + + let mut state = lock_or_recover(&self.state); + state + .destinations + .entry(destination.clone()) + .or_insert_with(|| PendingDestination { + destination: destination.clone(), + context: batch.context, + events: HashMap::new(), + }); + + for mut event in batch.flag_evaluations { + let key = EventKey::new(&event); + if merge_pending_event(&mut state, &destination, &key, &event) { + continue; + } + + let flag_key = event.flag.key.clone(); + let full_bucket_count_for_flag = state + .full_bucket_count_by_flag + .get(&flag_key) + .copied() + .unwrap_or(0); + + if state.full_bucket_count < GLOBAL_CAP && full_bucket_count_for_flag < PER_FLAG_CAP { + if insert_pending_event(&mut state, &destination, key, event) { + state.full_bucket_count += 1; + *state.full_bucket_count_by_flag.entry(flag_key).or_default() += 1; + } + continue; + } + + event.targeting_key = None; + event.context = None; + let evaluation_count = event.evaluation_count; + let degraded_key = EventKey::degraded(&event); + if merge_pending_event(&mut state, &destination, °raded_key, &event) { + self.writer_stats + .add_rows_degraded_cardinality_cap(evaluation_count); + continue; + } + + if state.degraded_bucket_count >= DEGRADED_CAP + || state.pending_bucket_count >= GLOBAL_CAP + DEGRADED_CAP + { + state.dropped_overflow = state.dropped_overflow.saturating_add(evaluation_count); + self.writer_stats + .add_rows_dropped_degraded_cap(evaluation_count); + continue; + } + + if insert_pending_event(&mut state, &destination, degraded_key, event) { + state.degraded_bucket_count += 1; + self.writer_stats + .add_rows_degraded_cardinality_cap(evaluation_count); + } + } + + if state.flush_running { + false + } else { + state.flush_running = true; + true + } + } + + pub fn take_batches(&self) -> Vec<(D, FfeFlagEvaluationBatch)> { + let mut state = lock_or_recover(&self.state); + if state.dropped_overflow > 0 { + log::warn!( + "ffe flagevaluation coalescer dropped {} pending bucket(s) after cardinality cap", + state.dropped_overflow + ); + state.dropped_overflow = 0; + } + + let destinations = std::mem::take(&mut state.destinations); + state.pending_bucket_count = 0; + state.full_bucket_count = 0; + state.full_bucket_count_by_flag.clear(); + state.degraded_bucket_count = 0; + destinations + .into_values() + .filter_map(|pending| { + if pending.events.is_empty() { + return None; + } + Some(( + pending.destination, + FfeFlagEvaluationBatch { + context: pending.context, + flag_evaluations: pending.events.into_values().collect(), + }, + )) + }) + .collect() + } + + /// Return true when the caller's flush loop can stop. + pub fn finish_flush_cycle(&self) -> bool { + let mut state = lock_or_recover(&self.state); + if state.destinations.is_empty() { + state.flush_running = false; + true + } else { + false + } + } + + pub fn collect_writer_stats(&self) -> FlagEvaluationEvpWriterStats { + self.writer_stats.collect_writer_stats() + } + + pub fn record_payload_build_result(&self, result: &FlagEvaluationEvpPayloadBuildResult) { + self.writer_stats + .add_rows_dropped_payload_limit(result.dropped_oversized_rows); + self.writer_stats + .add_rows_degraded_payload_limit(result.degraded_oversized_rows); + self.writer_stats.add_payload_splits(result.payload_splits); + } + + #[cfg(test)] + fn force_bucket_counts_for_test(&self, full_bucket_count: usize, degraded_bucket_count: usize) { + let mut state = lock_or_recover(&self.state); + state.flush_running = true; + state.full_bucket_count = full_bucket_count; + state.degraded_bucket_count = degraded_bucket_count; + state.pending_bucket_count = full_bucket_count.saturating_add(degraded_bucket_count); + } +} + +fn lock_or_recover(mutex: &Mutex) -> std::sync::MutexGuard<'_, T> { + mutex.lock().unwrap_or_else(|e| e.into_inner()) +} + +fn merge_pending_event( + state: &mut CoalescerState, + destination: &D, + key: &EventKey, + event: &FfeFlagEvaluationEvent, +) -> bool +where + D: Eq + Hash, +{ + let Some(pending) = state.destinations.get_mut(destination) else { + return false; + }; + + if let Some(existing) = pending.events.get_mut(key) { + merge_event(existing, event); + true + } else { + false + } +} + +fn insert_pending_event( + state: &mut CoalescerState, + destination: &D, + key: EventKey, + event: FfeFlagEvaluationEvent, +) -> bool +where + D: Eq + Hash, +{ + let Some(pending) = state.destinations.get_mut(destination) else { + log::warn!("ffe flagevaluation coalescer missing pending destination; dropping event"); + return false; + }; + + pending.events.insert(key, event); + state.pending_bucket_count += 1; + true +} + +fn merge_event(existing: &mut FfeFlagEvaluationEvent, incoming: &FfeFlagEvaluationEvent) { + existing.timestamp = existing.timestamp.max(incoming.timestamp); + existing.first_evaluation = existing.first_evaluation.min(incoming.first_evaluation); + existing.last_evaluation = existing.last_evaluation.max(incoming.last_evaluation); + existing.evaluation_count = existing + .evaluation_count + .saturating_add(incoming.evaluation_count); +} + +pub fn encode_flag_evaluation_payloads( + batch: FfeFlagEvaluationBatch, + payload_size_limit: usize, +) -> Result { + let FfeFlagEvaluationBatch { + context, + flag_evaluations, + } = batch; + + let context_json = build_context_payload(&context)?; + let payload_prefix = format!(r#"{{"context":{context_json},"flagEvaluations":["#); + let payload_suffix = "]}"; + let base_payload_size = payload_prefix.len() + payload_suffix.len(); + + let mut result = FlagEvaluationEvpPayloadBuildResult::default(); + let mut current_events = Vec::new(); + let mut current_size = base_payload_size; + + for event in flag_evaluations { + if current_events.len() >= MAX_EVENTS_PER_POST { + push_payload( + &mut result.payloads, + &payload_prefix, + payload_suffix, + &mut current_events, + ); + current_size = base_payload_size; + } + + let mut encoded_event = build_event_payload(&event)?; + let mut event_size = encoded_event.len(); + if !single_event_fits(base_payload_size, event_size, payload_size_limit) { + let Some(degraded_event) = degrade_event_for_payload_limit(&event) else { + result.dropped_oversized_rows = result + .dropped_oversized_rows + .saturating_add(event.evaluation_count); + continue; + }; + + let degraded_encoded_event = build_event_payload(°raded_event)?; + let degraded_event_size = degraded_encoded_event.len(); + if !single_event_fits(base_payload_size, degraded_event_size, payload_size_limit) { + result.dropped_oversized_rows = result + .dropped_oversized_rows + .saturating_add(event.evaluation_count); + continue; + } + + encoded_event = degraded_encoded_event; + event_size = degraded_event_size; + result.degraded_oversized_rows = result + .degraded_oversized_rows + .saturating_add(degraded_event.evaluation_count); + } + + let separator_size = usize::from(!current_events.is_empty()); + if current_size + separator_size + event_size > payload_size_limit + && !current_events.is_empty() + { + push_payload( + &mut result.payloads, + &payload_prefix, + payload_suffix, + &mut current_events, + ); + current_size = base_payload_size; + } + + let separator_size = usize::from(!current_events.is_empty()); + current_size += separator_size + event_size; + current_events.push(encoded_event); + } + + if !current_events.is_empty() { + push_payload( + &mut result.payloads, + &payload_prefix, + payload_suffix, + &mut current_events, + ); + } + result.payload_splits = result.payloads.len().saturating_sub(1) as u64; + + Ok(result) +} + +fn single_event_fits( + base_payload_size: usize, + event_size: usize, + payload_size_limit: usize, +) -> bool { + base_payload_size.saturating_add(event_size) <= payload_size_limit +} + +fn push_payload( + payloads: &mut Vec, + payload_prefix: &str, + payload_suffix: &str, + encoded_events: &mut Vec, +) { + debug_assert!( + !encoded_events.is_empty(), + "callers should only push non-empty payload event groups" + ); + if encoded_events.is_empty() { + return; + } + + let events_size: usize = encoded_events.iter().map(String::len).sum(); + let separators_size = encoded_events.len().saturating_sub(1); + let mut payload = String::with_capacity( + payload_prefix.len() + events_size + separators_size + payload_suffix.len(), + ); + payload.push_str(payload_prefix); + for (idx, event) in encoded_events.iter().enumerate() { + if idx > 0 { + payload.push(','); + } + payload.push_str(event); + } + payload.push_str(payload_suffix); + payloads.push(payload); + encoded_events.clear(); +} + +fn degrade_event_for_payload_limit( + event: &FfeFlagEvaluationEvent, +) -> Option { + if event.targeting_key.is_none() && event.context.is_none() { + return None; + } + + let mut degraded = event.clone(); + degraded.targeting_key = None; + degraded.context = None; + Some(degraded) +} + +#[cfg(test)] +fn build_payload(batch: &FfeFlagEvaluationBatch) -> Result { + let context_json = build_context_payload(&batch.context)?; + let mut encoded_events = Vec::with_capacity(batch.flag_evaluations.len()); + for event in &batch.flag_evaluations { + encoded_events.push(build_event_payload(event)?); + } + + Ok(build_payload_from_encoded_events( + &context_json, + &encoded_events, + )) +} + +fn build_context_payload(context: &FfeTelemetryContext) -> Result { + let mut value = serde_json::to_value(context)?; + strip_placeholders(&mut value); + serde_json::to_string(&value) +} + +fn build_event_payload(event: &FfeFlagEvaluationEvent) -> Result { + let mut value = serde_json::to_value(event)?; + expand_event_context(&mut value); + strip_placeholders(&mut value); + serde_json::to_string(&value) +} + +#[cfg(test)] +fn build_payload_from_encoded_events(context_json: &str, encoded_events: &[String]) -> String { + let events_size: usize = encoded_events.iter().map(String::len).sum(); + let separators_size = encoded_events.len().saturating_sub(1); + let mut payload = String::with_capacity( + r#"{"context":"#.len() + + context_json.len() + + r#","flagEvaluations":["#.len() + + events_size + + separators_size + + "]}".len(), + ); + + payload.push_str(r#"{"context":"#); + payload.push_str(context_json); + payload.push_str(r#","flagEvaluations":["#); + for (idx, event) in encoded_events.iter().enumerate() { + if idx > 0 { + payload.push(','); + } + payload.push_str(event); + } + payload.push_str("]}"); + payload +} + +fn expand_event_context(event: &mut serde_json::Value) { + let Some(context) = event.get_mut("context") else { + return; + }; + let Some(evaluation) = context.get_mut("evaluation") else { + return; + }; + let Some(s) = evaluation.as_str() else { + return; + }; + + match serde_json::from_str::(s) { + Ok(parsed) => *evaluation = parsed, + Err(_) => { + if let Some(obj) = context.as_object_mut() { + obj.remove("evaluation"); + } + } + } +} + +fn strip_placeholders(value: &mut serde_json::Value) { + strip_placeholders_at(value, PlaceholderLocation::Root); +} + +#[derive(Clone, Copy, Eq, PartialEq)] +enum PlaceholderLocation { + Root, + RootContext, + Other, +} + +fn strip_placeholders_at(value: &mut serde_json::Value, location: PlaceholderLocation) { + match value { + serde_json::Value::Object(map) => { + for (key, child) in map.iter_mut() { + if !(location == PlaceholderLocation::RootContext && key == "evaluation") { + let child_location = + if location == PlaceholderLocation::Root && key == "context" { + PlaceholderLocation::RootContext + } else { + PlaceholderLocation::Other + }; + strip_placeholders_at(child, child_location); + } + } + map.retain(|key, v| !is_placeholder(key, v)); + } + serde_json::Value::Array(items) => { + for item in items.iter_mut() { + strip_placeholders_at(item, PlaceholderLocation::Other); + } + items.retain(|v| !is_array_placeholder(v)); + } + _ => {} + } +} + +fn is_placeholder(key: &str, value: &serde_json::Value) -> bool { + match value { + serde_json::Value::Null => true, + serde_json::Value::Bool(b) => key == "runtime_default_used" && !b, + serde_json::Value::String(s) => { + matches!(key, "service" | "env" | "version") && s.is_empty() + } + serde_json::Value::Object(map) => map.is_empty(), + serde_json::Value::Array(items) => items.is_empty(), + serde_json::Value::Number(_) => false, + } +} + +fn is_array_placeholder(value: &serde_json::Value) -> bool { + match value { + serde_json::Value::Null => true, + serde_json::Value::Object(map) => map.is_empty(), + serde_json::Value::Array(items) => items.is_empty(), + _ => false, + } +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::{json, Value}; + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn full_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 42, + variant: Some(VariantKey { + key: "on".to_owned(), + }), + allocation: Some(AllocationKey { + key: "alloc-a".to_owned(), + }), + targeting_key: Some("user-123".to_owned()), + targeting_rule: None, + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("plan".to_owned(), json!("premium")); + m + }) + .unwrap(), + ), + dd: Some(ContextDD { + service: "frontend".to_owned(), + }), + }), + error: None, + runtime_default_used: false, + } + } + + // ── Test: required fields present in serialized JSON ────────────────────── + + #[test] + fn fully_populated_event_serializes_required_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let json = serde_json::to_string(&batch).unwrap(); + let v: Value = serde_json::from_str(&json).unwrap(); + + assert_eq!(v["context"]["service"], "svc"); + assert_eq!(v["context"]["env"], "prod"); + assert_eq!(v["context"]["version"], "1"); + + let ev = &v["flagEvaluations"][0]; + assert_eq!(ev["flag"]["key"], "my-flag"); + assert!(ev["first_evaluation"].is_number()); + assert!(ev["last_evaluation"].is_number()); + assert_eq!(ev["evaluation_count"], 42); + assert_eq!(ev["variant"]["key"], "on"); + assert_eq!(ev["allocation"]["key"], "alloc-a"); + assert_eq!(ev["targeting_key"], "user-123"); + } + + fn degraded_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "flag-b".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 7, + variant: None, + allocation: None, + targeting_rule: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: false, + } + } + + // ── Test: degraded-tier event serializes optional fields as null ────────── + // + // The type does not use `skip_serializing_if` (bincode-wire safety), so on + // the wire `None`/`false` optional fields ARE present (as null/false). The + // null-placeholder stripping that the flageval-worker schema requires + // happens in the EVP payload encoder, not at the type level. + + #[test] + fn degraded_tier_event_serializes_optional_fields_as_null() { + let degraded = degraded_event(); + let json = serde_json::to_string(°raded).unwrap(); + let v: Value = serde_json::from_str(&json).unwrap(); + + // Required fields present. + assert_eq!(v["flag"]["key"], "flag-b"); + assert!(v["first_evaluation"].is_number()); + assert!(v["last_evaluation"].is_number()); + assert_eq!(v["evaluation_count"], 7); + + // Optional fields are present as null/false placeholders on the wire + // (stripped later by the flusher, not at the type level). + assert!(v["variant"].is_null(), "variant should serialize as null"); + assert!( + v["allocation"].is_null(), + "allocation should serialize as null" + ); + assert!( + v["targeting_rule"].is_null(), + "targeting_rule should serialize as null" + ); + assert!( + v["targeting_key"].is_null(), + "targeting_key should serialize as null" + ); + assert!(v["context"].is_null(), "context should serialize as null"); + assert!(v["error"].is_null(), "error should serialize as null"); + assert_eq!( + v["runtime_default_used"], false, + "runtime_default_used should serialize as false" + ); + } + + #[test] + fn cap_sizing_constants_preserve_default_caps() { + assert_eq!(EVAL_SCALE_FULL_BUCKET_TARGET, 125_000); + assert_eq!(EVAL_SCALE_PER_FLAG_BUCKET_TARGET, 10_000); + assert_eq!(EVAL_SCALE_DEGRADED_BUCKET_TARGET, 25_000); + assert_eq!(GLOBAL_CAP, 131_072); + assert_eq!(PER_FLAG_CAP, 10_000); + assert_eq!(DEGRADED_CAP, 32_768); + } + + // ── Test: bincode round-trip with mixed Some/None fields ────────────────── + // + // Mechanical guard for the worker→sidecar IPC bug: bincode is a + // non-self-describing codec, so any `skip_serializing_if` on these types + // would omit a field on serialize while derived Deserialize still expects it + // in order → field misalignment → the sidecar drops the batch. A batch + // mixing a full-tier event (Some fields) and degraded-tier event (None + // fields) must survive serialize→deserialize byte-for-byte. + + #[test] + fn batch_round_trips_via_bincode_with_mixed_optional_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event(), degraded_event()], + }; + let bytes = bincode::serialize(&batch).expect("bincode serialize must succeed"); + let decoded: FfeFlagEvaluationBatch = + bincode::deserialize(&bytes).expect("bincode deserialize must succeed"); + assert_eq!( + batch, decoded, + "bincode round-trip must be lossless for a batch mixing Some and None fields" + ); + } + + // ── Test: context pruning — 256-field limit ─────────────────────────────── + + #[test] + fn context_pruning_keeps_at_most_256_fields() { + let mut attrs = BTreeMap::new(); + for i in 0..300usize { + attrs.insert(format!("key{i:04}"), json!(i.to_string())); + } + let pruned = prune_context(&attrs); + assert_eq!( + pruned.len(), + MAX_CONTEXT_FIELDS, + "pruned context must have at most {MAX_CONTEXT_FIELDS} fields" + ); + } + + // ── Test: context pruning — skip string values > 256 chars ─────────────── + + #[test] + fn context_pruning_skips_oversized_string_values() { + let mut attrs = BTreeMap::new(); + let long_value = "x".repeat(MAX_FIELD_LENGTH + 1); + attrs.insert("oversized".to_owned(), json!(long_value)); + attrs.insert("ok".to_owned(), json!("short")); + // Non-string values are kept regardless of length. + attrs.insert("num".to_owned(), json!(12345)); + + let pruned = prune_context(&attrs); + assert!( + !pruned.contains_key("oversized"), + "oversized string value must be skipped" + ); + assert!(pruned.contains_key("ok"), "short string value must be kept"); + assert!( + pruned.contains_key("num"), + "numeric value must be kept regardless of length" + ); + } + + // ── Test: batch round-trips via serde ──────────────────────────────────── + + #[test] + fn batch_round_trips_via_serde() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let json = serde_json::to_string(&batch).unwrap(); + let decoded: FfeFlagEvaluationBatch = serde_json::from_str(&json).unwrap(); + assert_eq!(batch, decoded); + } + + #[test] + fn payload_encoding_strips_degraded_tier_placeholders() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![degraded_event()], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: Value = serde_json::from_str(&payload).unwrap(); + let ev = &v["flagEvaluations"][0]; + + assert_eq!(ev["flag"]["key"], "flag-b"); + assert_eq!(ev["evaluation_count"], 7); + assert!(ev["first_evaluation"].is_number()); + assert!(ev["last_evaluation"].is_number()); + assert!(ev["timestamp"].is_number()); + assert!(ev.get("variant").is_none(), "variant must be stripped"); + assert!( + ev.get("allocation").is_none(), + "allocation must be stripped" + ); + assert!( + ev.get("targeting_rule").is_none(), + "targeting_rule must be stripped" + ); + assert!( + ev.get("targeting_key").is_none(), + "targeting_key must be stripped" + ); + assert!(ev.get("context").is_none(), "context must be stripped"); + assert!(ev.get("error").is_none(), "error must be stripped"); + assert!( + ev.get("runtime_default_used").is_none(), + "runtime_default_used=false must be stripped" + ); + } + + #[test] + fn payload_encoding_strips_empty_top_level_env_and_version_only_as_placeholders() { + let batch = FfeFlagEvaluationBatch { + context: FfeTelemetryContext { + service: "svc".to_owned(), + env: String::new(), + version: String::new(), + }, + flag_evaluations: vec![full_event()], + }; + + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: Value = serde_json::from_str(&payload).unwrap(); + + assert_eq!(v["context"]["service"], "svc"); + assert!( + v["context"].get("env").is_none(), + "empty env must be omitted from the request context" + ); + assert!( + v["context"].get("version").is_none(), + "empty version must be omitted from the request context" + ); + assert!( + v["context"].is_object(), + "request context must remain a JSON object" + ); + } + + #[test] + fn payload_encoding_keeps_full_tier_fields() { + let mut event = full_event(); + event.targeting_rule = Some(TargetingRuleKey { + key: "rule-1".to_owned(), + }); + event.error = Some(EvalError { + message: "boom".to_owned(), + }); + event.runtime_default_used = true; + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![event], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: Value = serde_json::from_str(&payload).unwrap(); + let ev = &v["flagEvaluations"][0]; + + assert_eq!(ev["variant"]["key"], "on", "variant must be kept"); + assert_eq!( + ev["allocation"]["key"], "alloc-a", + "allocation must be kept" + ); + assert_eq!( + ev["targeting_rule"]["key"], "rule-1", + "targeting_rule must be kept" + ); + assert_eq!( + ev["targeting_key"], "user-123", + "targeting_key must be kept" + ); + assert_eq!(ev["error"]["message"], "boom", "error must be kept"); + assert_eq!( + ev["runtime_default_used"], true, + "runtime_default_used=true must be kept" + ); + assert!( + ev.get("reason").is_none(), + "EVP payload must not emit top-level OpenFeature reason" + ); + + let ctx = &ev["context"]; + assert!( + ctx["evaluation"].is_object(), + "context.evaluation must be an object: {}", + ctx["evaluation"] + ); + assert_eq!(ctx["evaluation"]["plan"], "premium"); + assert_eq!( + ctx["dd"]["service"], "frontend", + "context.dd.service must be kept" + ); + } + + #[test] + fn payload_encoding_collapses_empty_nested_context() { + let mut ev = degraded_event(); + ev.context = Some(FlagEvalEventContext { + evaluation: None, + dd: Some(ContextDD { + service: String::new(), + }), + }); + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![ev], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: Value = serde_json::from_str(&payload).unwrap(); + + assert!( + v["flagEvaluations"][0].get("context").is_none(), + "a context that becomes empty after cleaning must be removed entirely" + ); + } + + #[test] + fn payload_encoding_expands_evaluation_string_into_object() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: Value = serde_json::from_str(&payload).unwrap(); + + let evaluation = &v["flagEvaluations"][0]["context"]["evaluation"]; + assert!( + evaluation.is_object(), + "context.evaluation must be a JSON object in the POST body, not a string: {evaluation}" + ); + assert_eq!( + evaluation["plan"], "premium", + "the expanded object must preserve the original key/value" + ); + assert!( + !evaluation.is_string(), + "context.evaluation must not remain a quoted string" + ); + } + + #[test] + fn payload_encoding_drops_unparseable_evaluation_gracefully() { + let mut event = full_event(); + event.context = Some(FlagEvalEventContext { + evaluation: Some("this is not json".to_owned()), + dd: None, + }); + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![event], + }; + + let payload = build_payload(&batch).expect("build_payload must not fail on bad input"); + let v: Value = serde_json::from_str(&payload).unwrap(); + + assert!( + v["flagEvaluations"][0]["context"] + .get("evaluation") + .is_none(), + "unparseable evaluation must be dropped from the body" + ); + } + + #[test] + fn placeholder_stripping_recurses_into_non_context_evaluation_objects() { + let mut value = json!({ + "evaluation": { + "empty_array": [], + "empty_object": {}, + "items": [null, {}, [], "kept"], + "null_value": null, + "present": "kept" + } + }); + + strip_placeholders(&mut value); + + assert_eq!( + value, + json!({ + "evaluation": { + "items": ["kept"], + "present": "kept" + } + }) + ); + } + + #[test] + fn placeholder_stripping_preserves_context_evaluation_subtree() { + let mut value = json!({ + "context": { + "evaluation": { + "enabled": false, + "empty": "", + "empty_array": [], + "empty_object": {}, + "null_value": null + }, + "dd": { + "service": "" + } + } + }); + + strip_placeholders(&mut value); + + let evaluation = &value["context"]["evaluation"]; + assert_eq!(evaluation["enabled"], false); + assert_eq!(evaluation["empty"], ""); + assert!(evaluation["empty_array"].is_array()); + assert!(evaluation["empty_object"].is_object()); + assert!(evaluation["null_value"].is_null()); + assert!(value["context"].get("dd").is_none()); + } + + #[test] + fn payload_encoding_preserves_false_and_empty_context_values() { + let mut event = full_event(); + event.context = Some(FlagEvalEventContext { + evaluation: Some( + json!({ + "enabled": false, + "empty": "", + "empty_object": {}, + "empty_array": [] + }) + .to_string(), + ), + dd: None, + }); + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![event], + }; + + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: Value = serde_json::from_str(&payload).unwrap(); + let evaluation = &v["flagEvaluations"][0]["context"]["evaluation"]; + + assert_eq!(evaluation["enabled"], false); + assert_eq!(evaluation["empty"], ""); + assert!(evaluation["empty_object"].is_object()); + assert!(evaluation["empty_array"].is_array()); + } + + #[test] + fn payload_encoding_empty_batch_has_no_payloads_or_splits() { + let result = encode_flag_evaluation_payloads( + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![], + }, + EVP_PAYLOAD_SIZE_LIMIT, + ) + .expect("payload build must succeed"); + + assert!(result.payloads.is_empty()); + assert_eq!(result.payload_splits, 0); + assert_eq!(result.dropped_oversized_rows, 0); + assert_eq!(result.degraded_oversized_rows, 0); + } + + #[test] + fn payload_encoding_splits_by_encoded_byte_limit() { + let event = full_event(); + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![event.clone(), event.clone(), event.clone()], + }; + let one_event_limit = build_payload(&FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![event], + }) + .unwrap() + .len(); + + let result = encode_flag_evaluation_payloads(batch, one_event_limit) + .expect("payload build must succeed"); + + assert_eq!(result.dropped_oversized_rows, 0); + assert_eq!(result.degraded_oversized_rows, 0); + assert_eq!(result.payload_splits, result.payloads.len() as u64 - 1); + assert_eq!( + result.payloads.len(), + 3, + "the byte limit should split before a second event is appended" + ); + for payload in &result.payloads { + assert!( + payload.len() <= one_event_limit, + "payload length {} exceeded limit {}: {}", + payload.len(), + one_event_limit, + payload + ); + } + } + + #[test] + fn payload_encoding_degrades_oversized_full_event_before_drop() { + let mut oversized = full_event(); + oversized.targeting_rule = Some(TargetingRuleKey { + key: "rule-1".to_owned(), + }); + oversized.error = Some(EvalError { + message: "boom".to_owned(), + }); + oversized.runtime_default_used = true; + oversized.context = Some(FlagEvalEventContext { + evaluation: Some(json!({ "blob": "x".repeat(1024) }).to_string()), + dd: Some(ContextDD { + service: "frontend".to_owned(), + }), + }); + + let degraded = degrade_event_for_payload_limit(&oversized) + .expect("full event should have a degraded form"); + let degraded_limit = build_payload(&FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![degraded], + }) + .unwrap() + .len(); + let full_size = build_payload(&FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![oversized.clone()], + }) + .unwrap() + .len(); + assert!( + full_size > degraded_limit, + "test setup must make the full event exceed the degraded limit" + ); + + let result = encode_flag_evaluation_payloads( + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![oversized], + }, + degraded_limit, + ) + .expect("payload build must succeed"); + + assert_eq!(result.dropped_oversized_rows, 0); + assert_eq!(result.degraded_oversized_rows, 42); + assert_eq!(result.payload_splits, 0); + assert_eq!(result.payloads.len(), 1); + assert!(result.payloads[0].len() <= degraded_limit); + + let v: Value = serde_json::from_str(&result.payloads[0]).unwrap(); + let ev = &v["flagEvaluations"][0]; + assert!( + ev.get("targeting_key").is_none(), + "oversized full row must omit targeting_key after degradation" + ); + assert!( + ev.get("context").is_none(), + "oversized full row must omit context after degradation" + ); + assert_eq!(ev["variant"]["key"], "on"); + assert_eq!(ev["allocation"]["key"], "alloc-a"); + assert_eq!(ev["targeting_rule"]["key"], "rule-1"); + assert_eq!(ev["error"]["message"], "boom"); + } + + #[test] + fn payload_encoding_drops_oversized_degraded_event() { + let mut oversized = degraded_event(); + oversized.flag.key = "x".repeat(1024); + + let result = encode_flag_evaluation_payloads( + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![oversized], + }, + 128, + ) + .expect("payload build must succeed"); + + assert!(result.payloads.is_empty()); + assert_eq!(result.dropped_oversized_rows, 7); + assert_eq!(result.degraded_oversized_rows, 0); + assert_eq!(result.payload_splits, 0); + } + + #[test] + fn coalescer_coalesces_identical_batches() { + let coalescer = FlagEvaluationEvpCoalescer::::default(); + let destination = "agent".to_owned(); + + assert!(coalescer.enqueue( + destination.clone(), + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }, + )); + assert!(!coalescer.enqueue( + destination, + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }, + )); + + let batches = coalescer.take_batches(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].1.flag_evaluations.len(), 1); + assert_eq!(batches[0].1.flag_evaluations[0].evaluation_count, 84); + assert!(coalescer.finish_flush_cycle()); + } + + #[test] + fn coalescer_degrades_after_per_flag_cap() { + let coalescer = FlagEvaluationEvpCoalescer::::default(); + let mut events = Vec::with_capacity(PER_FLAG_CAP + 50); + for index in 0..(PER_FLAG_CAP + 50) { + let mut event = full_event(); + event.evaluation_count = 1; + event.targeting_key = Some(format!("user-{index}")); + event.targeting_rule = Some(TargetingRuleKey { + key: "rule-1".to_owned(), + }); + events.push(event); + } + + assert!(coalescer.enqueue( + "agent".to_owned(), + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: events, + }, + )); + + let batches = coalescer.take_batches(); + assert_eq!(batches.len(), 1); + let events = &batches[0].1.flag_evaluations; + let full_events = events + .iter() + .filter(|event| event.targeting_key.is_some() || event.context.is_some()) + .count(); + let degraded = events + .iter() + .find(|event| event.targeting_key.is_none() && event.context.is_none()) + .expect("overflow must be folded into a degraded bucket"); + + assert_eq!(full_events, PER_FLAG_CAP); + assert_eq!(degraded.evaluation_count, 50); + assert_eq!( + degraded.variant.as_ref().map(|v| v.key.as_str()), + Some("on") + ); + assert_eq!( + degraded.allocation.as_ref().map(|a| a.key.as_str()), + Some("alloc-a") + ); + assert_eq!( + degraded + .targeting_rule + .as_ref() + .map(|rule| rule.key.as_str()), + Some("rule-1") + ); + + let stats = coalescer.collect_writer_stats(); + assert_eq!(stats.rows_degraded_cardinality_cap, 50); + assert_eq!(stats.rows_dropped_degraded_cap, 0); + } + + #[test] + fn coalescer_counts_degraded_cap_drops_by_evaluation_count() { + let coalescer = FlagEvaluationEvpCoalescer::::default(); + coalescer.force_bucket_counts_for_test(GLOBAL_CAP, DEGRADED_CAP); + let mut event = full_event(); + event.evaluation_count = 9; + + assert!(!coalescer.enqueue( + "agent".to_owned(), + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![event], + }, + )); + + let stats = coalescer.collect_writer_stats(); + assert_eq!(stats.rows_dropped_degraded_cap, 9); + assert_eq!(stats.rows_degraded_cardinality_cap, 0); + } +} diff --git a/datadog-ffe/src/telemetry/flagevaluation/sender.rs b/datadog-ffe/src/telemetry/flagevaluation/sender.rs new file mode 100644 index 0000000000..bf21efee85 --- /dev/null +++ b/datadog-ffe/src/telemetry/flagevaluation/sender.rs @@ -0,0 +1,509 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use super::{encode_flag_evaluation_payloads, FfeFlagEvaluationBatch}; +use http::uri::PathAndQuery; +use http::Method; +use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +use libdd_common::Endpoint; +use std::time::Duration; + +/// EVP proxy path for FFE flag evaluation intake. +pub const EVP_FLAGEVALUATION_PATH: &str = "/evp_proxy/v2/api/v2/flagevaluation"; +/// EVP subdomain header name. +pub const EVP_SUBDOMAIN_HEADER: &str = "X-Datadog-EVP-Subdomain"; +/// EVP subdomain that routes requests to event-platform intake. +pub const EVP_SUBDOMAIN_VALUE: &str = "event-platform-intake"; +/// Agent EVP proxy uncompressed request-body limit. +/// +/// Revalidated against `DataDog/datadog-agent` on 2026-07-01: +/// `pkg/config/setup/apm.go` defaults `evp_proxy_config.max_payload_size` to +/// `10*1024*1024`; `comp/trace/config/impl/setup.go` copies that value into +/// `EVPProxy.MaxPayloadSize`; and `pkg/trace/api/evp_proxy.go` enforces it via +/// `apiutil.NewLimitedReader`. +pub const EVP_PAYLOAD_SIZE_LIMIT: usize = 10 * 1024 * 1024; + +#[derive(Clone, Debug)] +pub struct FlagEvaluationEvpSendConfig { + user_agent: String, + payload_size_limit: usize, +} + +impl FlagEvaluationEvpSendConfig { + pub fn new(user_agent: impl Into) -> Self { + Self { + user_agent: user_agent.into(), + payload_size_limit: EVP_PAYLOAD_SIZE_LIMIT, + } + } + + pub fn with_payload_size_limit(mut self, payload_size_limit: usize) -> Self { + self.payload_size_limit = payload_size_limit; + self + } +} + +/// Build the Agent EVP proxy endpoint for FFE flag evaluation intake. +/// +/// This preserves the base endpoint's scheme, authority, timeout, and test +/// token while swapping the path to `/evp_proxy/v2/api/v2/flagevaluation`. +/// Agentless submission is not wired yet, so API-key endpoints are rejected +/// until this sender grows direct intake routing. +pub fn flagevaluation_agent_proxy_endpoint(base: &Endpoint) -> Option { + if base.api_key.is_some() { + return None; + } + + let mut parts = base.url.clone().into_parts(); + parts.path_and_query = Some(PathAndQuery::from_static(EVP_FLAGEVALUATION_PATH)); + let url = http::Uri::from_parts(parts).ok()?; + Some(Endpoint { + url, + ..base.clone() + }) +} + +/// POST a structured FFE flag evaluation batch through the Agent EVP proxy. +/// +/// Returns the payload-build result after all generated payloads have been +/// attempted. HTTP failures are logged and dropped, matching the fire-and-forget +/// SDK behavior. +pub async fn send_flag_evaluation_batch( + client: &C, + endpoint: &Endpoint, + batch: FfeFlagEvaluationBatch, + config: &FlagEvaluationEvpSendConfig, +) -> Option { + let result = match encode_flag_evaluation_payloads(batch, config.payload_size_limit) { + Ok(result) => result, + Err(e) => { + log::debug!("ffe flagevaluation sender failed to encode batch payload: {e:?}"); + return None; + } + }; + + if result.dropped_oversized_rows > 0 { + log::debug!( + "ffe flagevaluation sender dropped {} flag evaluation row(s) because they exceeded the {} byte EVP payload limit after degradation", + result.dropped_oversized_rows, + config.payload_size_limit + ); + } + + for payload in &result.payloads { + send_payload(client, endpoint, payload.clone(), config).await; + } + + Some(result) +} + +async fn send_payload( + client: &C, + endpoint: &Endpoint, + payload: String, + config: &FlagEvaluationEvpSendConfig, +) { + let builder = match endpoint.to_request_builder(&config.user_agent) { + Ok(b) => b, + Err(e) => { + log::debug!("ffe flagevaluation sender failed to build request: {e:?}"); + return; + } + }; + + let req = match builder + .method(Method::POST) + .header("Content-Type", "application/json") + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .body(Bytes::from(payload)) + { + Ok(r) => r, + Err(e) => { + log::debug!("ffe flagevaluation sender failed to construct request body: {e:?}"); + return; + } + }; + + let timeout = Duration::from_millis(endpoint.timeout_ms); + let response = tokio::select! { + biased; + result = client.request(req) => result, + _ = client.sleep(timeout) => { + log::debug!("ffe flagevaluation sender request timed out after {timeout:?}"); + return; + } + }; + + match response { + Ok(resp) => { + let status = resp.status(); + if !status.is_success() { + let body_preview = truncate(resp.body().as_ref(), 256); + log::debug!("ffe flagevaluation sender non-2xx response {status}: {body_preview}"); + } else { + log::debug!( + "ffe flagevaluation sender sent flag evaluation batch, status={status}" + ); + } + } + Err(e) => { + log::debug!("ffe flagevaluation sender request failed: {e:?}"); + } + } +} + +fn truncate(bytes: &[u8], cap: usize) -> String { + let take = bytes.len().min(cap); + String::from_utf8_lossy(&bytes[..take]).into_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::telemetry::FfeTelemetryContext; + use httpmock::MockServer; + use libdd_capabilities::{HttpError, MaybeSend}; + use libdd_capabilities_impl::NativeCapabilities; + use serde_json::json; + use std::collections::BTreeMap; + use std::future; + use std::sync::{Mutex, Once}; + + use super::super::{ + AllocationKey, ContextDD, EvalError, FfeFlagEvaluationEvent, FlagEvalEventContext, FlagKey, + TargetingRuleKey, VariantKey, MAX_EVENTS_PER_POST, + }; + + #[derive(Clone)] + struct CapturedLog { + level: log::Level, + message: String, + } + + struct CapturingLogger { + records: Mutex>, + } + + impl log::Log for CapturingLogger { + fn enabled(&self, metadata: &log::Metadata<'_>) -> bool { + metadata.level() <= log::Level::Debug + } + + fn log(&self, record: &log::Record<'_>) { + if self.enabled(record.metadata()) { + self.records.lock().unwrap().push(CapturedLog { + level: record.level(), + message: record.args().to_string(), + }); + } + } + + fn flush(&self) {} + } + + static TEST_LOGGER: CapturingLogger = CapturingLogger { + records: Mutex::new(Vec::new()), + }; + static INIT_LOGGER: Once = Once::new(); + + fn start_log_capture() { + INIT_LOGGER.call_once(|| { + let _ = log::set_logger(&TEST_LOGGER); + log::set_max_level(log::LevelFilter::Debug); + }); + TEST_LOGGER.records.lock().unwrap().clear(); + } + + fn captured_logs() -> Vec { + TEST_LOGGER.records.lock().unwrap().clone() + } + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn full_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 42, + variant: Some(VariantKey { + key: "on".to_owned(), + }), + allocation: Some(AllocationKey { + key: "alloc-a".to_owned(), + }), + targeting_key: Some("user-123".to_owned()), + targeting_rule: Some(TargetingRuleKey { + key: "rule-1".to_owned(), + }), + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("plan".to_owned(), json!("premium")); + m + }) + .unwrap(), + ), + dd: Some(ContextDD { + service: "frontend".to_owned(), + }), + }), + error: Some(EvalError { + message: "boom".to_owned(), + }), + runtime_default_used: true, + } + } + + fn endpoint_for(server: &MockServer) -> Endpoint { + Endpoint { + url: server.url("/").parse().unwrap(), + ..Endpoint::default() + } + } + + fn send_config() -> FlagEvaluationEvpSendConfig { + FlagEvaluationEvpSendConfig::new("datadog-ffe-test/0.0.0") + } + + fn batch() -> FfeFlagEvaluationBatch { + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + } + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn posts_to_evp_proxy() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATION_PATH) + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .header("content-type", "application/json"); + then.status(202); + }) + .await; + + let ep = flagevaluation_agent_proxy_endpoint(&endpoint_for(&server)).unwrap(); + let client = NativeCapabilities::new_client(); + + send_flag_evaluation_batch(&client, &ep, batch(), &send_config()).await; + + mock.assert_async().await; + assert_eq!(mock.calls_async().await, 1); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn splits_large_batches_before_posting() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATION_PATH) + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .header("content-type", "application/json"); + then.status(202); + }) + .await; + + let ep = flagevaluation_agent_proxy_endpoint(&endpoint_for(&server)).unwrap(); + let client = NativeCapabilities::new_client(); + let mut batch = batch(); + let event = batch.flag_evaluations[0].clone(); + batch.flag_evaluations = vec![event; MAX_EVENTS_PER_POST * 2 + 1]; + + send_flag_evaluation_batch(&client, &ep, batch, &send_config()).await; + + mock.assert_calls_async(3).await; + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn send_batch_splits_posts_by_encoded_byte_limit() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATION_PATH) + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .header("content-type", "application/json"); + then.status(202); + }) + .await; + + let ep = flagevaluation_agent_proxy_endpoint(&endpoint_for(&server)).unwrap(); + let client = NativeCapabilities::new_client(); + let mut batch = batch(); + let event = batch.flag_evaluations[0].clone(); + batch.flag_evaluations = vec![event; 3]; + let one_event_limit = encode_flag_evaluation_payloads( + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![batch.flag_evaluations[0].clone()], + }, + usize::MAX, + ) + .unwrap() + .payloads + .into_iter() + .next() + .unwrap() + .len(); + let config = send_config().with_payload_size_limit(one_event_limit); + + send_flag_evaluation_batch(&client, &ep, batch, &config).await; + + mock.assert_calls_async(3).await; + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn non_2xx_does_not_panic() { + let server = MockServer::start_async().await; + let _mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATION_PATH); + then.status(500).body("intake overloaded"); + }) + .await; + + let ep = flagevaluation_agent_proxy_endpoint(&endpoint_for(&server)).unwrap(); + let client = NativeCapabilities::new_client(); + + send_flag_evaluation_batch(&client, &ep, batch(), &send_config()).await; + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn failure_paths_log_at_debug_level() { + start_log_capture(); + + let server = MockServer::start_async().await; + let _mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATION_PATH); + then.status(500).body("intake overloaded"); + }) + .await; + let ep = flagevaluation_agent_proxy_endpoint(&endpoint_for(&server)).unwrap(); + let client = NativeCapabilities::new_client(); + + send_flag_evaluation_batch(&client, &ep, batch(), &send_config()).await; + + let mut oversized = full_event(); + oversized.flag.key = "x".repeat(1024); + let result = send_flag_evaluation_batch( + &client, + &ep, + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![oversized], + }, + &send_config().with_payload_size_limit(128), + ) + .await + .expect("payload build should succeed"); + assert_eq!(result.dropped_oversized_rows, 42); + + let records = captured_logs(); + for pattern in [ + "ffe flagevaluation sender non-2xx response 500", + "ffe flagevaluation sender dropped 42 flag evaluation row(s)", + ] { + assert!( + records + .iter() + .any(|record| record.level == log::Level::Debug + && record.message.contains(pattern)), + "expected debug log containing {pattern:?}; got {:?}", + records + .iter() + .map(|record| (&record.level, &record.message)) + .collect::>() + ); + assert!( + !records + .iter() + .any(|record| record.level == log::Level::Warn + && record.message.contains(pattern)), + "expected no warn log containing {pattern:?}" + ); + } + } + + #[tokio::test] + async fn timeout_returns_without_waiting_for_http_response() { + let ep = Endpoint { + url: "http://localhost:8126".parse().unwrap(), + timeout_ms: 1, + ..Endpoint::default() + }; + + send_flag_evaluation_batch(&HangingCapabilities, &ep, batch(), &send_config()).await; + } + + #[test] + fn endpoint_preserves_authority_overrides_path() { + let base = Endpoint { + url: "http://agent.internal:8126/v0.4/traces".parse().unwrap(), + ..Endpoint::default() + }; + let ep = flagevaluation_agent_proxy_endpoint(&base).unwrap(); + assert_eq!(ep.url.scheme_str(), Some("http")); + assert_eq!(ep.url.authority().unwrap().as_str(), "agent.internal:8126"); + assert_eq!(ep.url.path(), EVP_FLAGEVALUATION_PATH); + } + + #[test] + fn endpoint_rejects_agentless() { + let base = Endpoint { + url: "https://trace.agent.datadoghq.com/v0.4/traces" + .parse() + .unwrap(), + api_key: Some("api-key".into()), + ..Endpoint::default() + }; + assert!(flagevaluation_agent_proxy_endpoint(&base).is_none()); + } + + #[derive(Clone, Debug)] + struct HangingCapabilities; + + impl HttpClientCapability for HangingCapabilities { + fn new_client() -> Self { + Self + } + + fn request( + &self, + _req: http::Request, + ) -> impl future::Future, HttpError>> + MaybeSend + { + future::pending() + } + } + + impl SleepCapability for HangingCapabilities { + fn new() -> Self { + Self + } + + async fn sleep(&self, _duration: Duration) {} + } +} diff --git a/datadog-ffe/src/telemetry/mod.rs b/datadog-ffe/src/telemetry/mod.rs index 6b5e851025..4fad83f38c 100644 --- a/datadog-ffe/src/telemetry/mod.rs +++ b/datadog-ffe/src/telemetry/mod.rs @@ -5,6 +5,8 @@ pub mod evaluation_metrics; #[cfg(feature = "exposure-events")] pub mod exposures; +#[cfg(feature = "flagevaluation-evp")] +pub mod flagevaluation; use serde::{Deserialize, Serialize}; diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 13f915f9e9..0be5358081 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -22,10 +22,15 @@ use datadog_sidecar::service::agent_info::AgentInfoReader; use datadog_sidecar::service::telemetry::InternalTelemetryAction; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, - DynamicInstrumentationConfigState, FfeEvaluationMetric as SidecarFfeEvaluationMetric, - FfeExposure as SidecarFfeExposure, FfeExposureBatch as SidecarFfeExposureBatch, - FfeTelemetryContext as SidecarFfeTelemetryContext, InstanceId, QueueId, RuntimeMetadata, - SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions, + AllocationKey, ContextDD, DynamicInstrumentationConfigState, EvalError, + FfeEvaluationMetric as SidecarFfeEvaluationMetric, FfeExposure as SidecarFfeExposure, + FfeExposureBatch as SidecarFfeExposureBatch, + FfeFlagEvaluationBatch as SidecarFfeFlagEvaluationBatch, + FfeFlagEvaluationEvent as SidecarFfeFlagEvaluationEvent, + FfeTelemetryContext as SidecarFfeTelemetryContext, FlagEvalEventContext, FlagKey, InstanceId, + QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, + SidecarFlushOptions, TargetingRuleKey, VariantKey, MAX_CONTEXT_DEPTH, MAX_CONTEXT_FIELDS, + MAX_FIELD_LENGTH, }; use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions}; use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader}; @@ -1209,6 +1214,25 @@ pub struct FfeEvaluationMetric<'a> { pub allocation_key: CharSlice<'a>, } +#[repr(C)] +pub struct FfeFlagEvaluation<'a> { + pub timestamp_ms: i64, + pub flag_key: CharSlice<'a>, + pub first_evaluation_ms: i64, + pub last_evaluation_ms: i64, + pub evaluation_count: u64, + pub variant: CharSlice<'a>, + pub allocation_key: CharSlice<'a>, + pub targeting_rule_key: CharSlice<'a>, + pub targeting_key: CharSlice<'a>, + /// UTF-8 JSON object. Empty, invalid, or non-object JSON is omitted. Object + /// values are pruned to 256 leaf fields, 256-byte string values, and four + /// levels of nested context depth. + pub evaluation_context_json: CharSlice<'a>, + pub error_message: CharSlice<'a>, + pub runtime_default_used: bool, +} + /// Send structured FFE exposure events to the sidecar. The sidecar owns /// deduplication, JSON serialization, and Agent EVP delivery. This function is /// caller-driven; shared libdatadog evaluator calls do not log unless an SDK @@ -1280,6 +1304,78 @@ fn ddog_sidecar_send_ffe_exposure_batch_impl( MaybeError::None } +/// Send structured FFE flag evaluation events to the sidecar. The sidecar owns +/// JSON serialization and Agent EVP delivery. This function is caller-driven; +/// callers must aggregate and bound event cardinality before passing a batch. +/// +/// # Safety +/// `context` and every element in `flag_evaluations` must contain valid UTF-8 +/// `CharSlice` values. Empty `flag_evaluations` is a no-op. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_send_ffe_flag_evaluation_batch( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + context: &FfeTelemetryContext<'_>, + flag_evaluations: Slice>, +) -> MaybeError { + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + ddog_sidecar_send_ffe_flag_evaluation_batch_impl( + transport, + instance_id, + queue_id, + context, + flag_evaluations, + ) + })) + .unwrap_or_else(|panic| { + MaybeError::Some(libdd_common_ffi::utils::handle_panic_error( + panic, + "ddog_sidecar_send_ffe_flag_evaluation_batch", + )) + }) +} + +fn ddog_sidecar_send_ffe_flag_evaluation_batch_impl( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + context: &FfeTelemetryContext<'_>, + flag_evaluations: Slice>, +) -> MaybeError { + let flag_evaluations = try_c!(flag_evaluations + .try_as_slice() + .map_err(|e| format!("Invalid flag evaluation slice: {e}"))); + + if flag_evaluations.is_empty() { + return MaybeError::None; + } + + let context = try_c!(ffe_context_from_ffi(context)); + let flag_evaluations = try_c!(flag_evaluations + .iter() + .map(|event| ffe_flag_evaluation_from_ffi(event, &context.service)) + .collect::, _>>()); + + if flag_evaluations.is_empty() { + return MaybeError::None; + } + + try_c!(blocking::enqueue_actions( + transport, + instance_id, + queue_id, + vec![SidecarAction::FfeFlagEvaluationBatch( + SidecarFfeFlagEvaluationBatch { + context, + flag_evaluations, + } + )], + )); + MaybeError::None +} + /// Send structured FFE evaluation metric events to the sidecar. The sidecar /// owns aggregation, OTLP/protobuf serialization, and OTLP HTTP delivery. This /// function is caller-driven so SDKs with existing host-language hooks can @@ -1344,6 +1440,107 @@ fn ffe_exposure_from_ffi(exposure: &FfeExposure<'_>) -> Result, + service: &str, +) -> Result { + let evaluation = optional_json_object_string(event.evaluation_context_json)?; + let dd = (!service.is_empty()).then(|| ContextDD { + service: service.to_owned(), + }); + let context = if evaluation.is_some() || dd.is_some() { + Some(FlagEvalEventContext { evaluation, dd }) + } else { + None + }; + + Ok(SidecarFfeFlagEvaluationEvent { + timestamp: event.timestamp_ms, + flag: FlagKey { + key: char_slice_to_string(event.flag_key)?, + }, + first_evaluation: event.first_evaluation_ms, + last_evaluation: event.last_evaluation_ms, + evaluation_count: event.evaluation_count, + variant: optional_string(event.variant)?.map(|key| VariantKey { key }), + allocation: optional_string(event.allocation_key)?.map(|key| AllocationKey { key }), + targeting_rule: optional_string(event.targeting_rule_key)? + .map(|key| TargetingRuleKey { key }), + targeting_key: optional_string(event.targeting_key)?, + context, + error: optional_string(event.error_message)?.map(|message| EvalError { message }), + runtime_default_used: event.runtime_default_used, + }) +} + +fn prune_evaluation_context_json(value: serde_json::Value) -> Option { + let serde_json::Value::Object(attrs) = value else { + return None; + }; + + let mut remaining_fields = MAX_CONTEXT_FIELDS; + let pruned = prune_context_object(&attrs, 1, &mut remaining_fields); + Some(serde_json::Value::Object(pruned).to_string()) +} + +fn prune_context_object( + attrs: &serde_json::Map, + depth: usize, + remaining_fields: &mut usize, +) -> serde_json::Map { + let mut keys: Vec<_> = attrs.keys().collect(); + keys.sort(); + + let mut pruned = serde_json::Map::new(); + for key in keys { + if *remaining_fields == 0 { + break; + } + let Some(value) = attrs + .get(key) + .and_then(|value| prune_context_value(value, depth, remaining_fields)) + else { + continue; + }; + pruned.insert(key.clone(), value); + } + pruned +} + +fn prune_context_value( + value: &serde_json::Value, + depth: usize, + remaining_fields: &mut usize, +) -> Option { + match value { + serde_json::Value::String(s) if s.len() > MAX_FIELD_LENGTH => None, + serde_json::Value::Object(attrs) => { + if depth >= MAX_CONTEXT_DEPTH { + return None; + } + let pruned = prune_context_object(attrs, depth + 1, remaining_fields); + (!pruned.is_empty()).then_some(serde_json::Value::Object(pruned)) + } + serde_json::Value::Array(values) => { + if depth >= MAX_CONTEXT_DEPTH { + return None; + } + let pruned: Vec<_> = values + .iter() + .filter_map(|value| prune_context_value(value, depth + 1, remaining_fields)) + .collect(); + (!pruned.is_empty()).then_some(serde_json::Value::Array(pruned)) + } + _ => { + if *remaining_fields == 0 { + return None; + } + *remaining_fields -= 1; + Some(value.clone()) + } + } +} + fn ffe_metric_from_ffi( metric: &FfeEvaluationMetric<'_>, ) -> Result { @@ -1364,6 +1561,17 @@ fn optional_string(slice: CharSlice) -> Result, String> { } } +fn optional_json_object_string(slice: CharSlice) -> Result, String> { + let Some(raw) = optional_string(slice)? else { + return Ok(None); + }; + let value = match serde_json::from_str::(&raw) { + Ok(value) => value, + Err(_) => return Ok(None), + }; + Ok(prune_evaluation_context_json(value)) +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] #[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals @@ -1800,6 +2008,23 @@ mod tests { use super::*; use std::borrow::Cow; + fn ffi_flag_evaluation<'a>(evaluation_context_json: &'a str) -> FfeFlagEvaluation<'a> { + FfeFlagEvaluation { + timestamp_ms: 1_700_000_000_000, + flag_key: CharSlice::from("flag-a"), + first_evaluation_ms: 1_699_999_000_000, + last_evaluation_ms: 1_700_000_000_000, + evaluation_count: 7, + variant: CharSlice::empty(), + allocation_key: CharSlice::empty(), + targeting_rule_key: CharSlice::empty(), + targeting_key: CharSlice::empty(), + evaluation_context_json: CharSlice::from(evaluation_context_json), + error_message: CharSlice::empty(), + runtime_default_used: false, + } + } + #[test] fn otlp_metrics_endpoint_inherits_agent_test_token_when_missing() { let agent_endpoint = Endpoint { @@ -1833,4 +2058,72 @@ mod tests { assert_eq!(endpoint.test_token.as_deref(), Some("metrics-token")); } + + #[test] + fn ffe_flag_evaluation_preserves_service_without_evaluation_context() { + let event = ffi_flag_evaluation(""); + + let converted = ffe_flag_evaluation_from_ffi(&event, "checkout").unwrap(); + let context = converted.context.expect("service attribution must remain"); + + assert!(context.evaluation.is_none()); + assert_eq!( + context.dd.expect("dd context must be present").service, + "checkout" + ); + } + + #[test] + fn ffe_flag_evaluation_prunes_context_field_count_and_long_strings() { + let mut attrs = serde_json::Map::new(); + attrs.insert( + "aaa_long".to_owned(), + serde_json::Value::String("x".repeat(MAX_FIELD_LENGTH + 1)), + ); + for index in 0..=MAX_CONTEXT_FIELDS { + attrs.insert(format!("field_{index:03}"), serde_json::json!(index)); + } + let raw = serde_json::Value::Object(attrs).to_string(); + let event = ffi_flag_evaluation(&raw); + + let converted = ffe_flag_evaluation_from_ffi(&event, "checkout").unwrap(); + let context = converted.context.expect("context must be present"); + let evaluation = context.evaluation.expect("evaluation context must remain"); + let value: serde_json::Value = serde_json::from_str(&evaluation).unwrap(); + let attrs = value.as_object().unwrap(); + + assert_eq!(attrs.len(), MAX_CONTEXT_FIELDS); + assert!(!attrs.contains_key("aaa_long")); + assert!(attrs.contains_key("field_000")); + assert!(attrs.contains_key(&format!("field_{:03}", MAX_CONTEXT_FIELDS - 1))); + assert!(!attrs.contains_key(&format!("field_{MAX_CONTEXT_FIELDS:03}"))); + } + + #[test] + fn ffe_flag_evaluation_prunes_context_beyond_depth_four() { + let raw = serde_json::json!({ + "a": { + "b": { + "c": { + "d": "kept", + "too_deep": { + "e": "dropped" + } + } + } + }, + "top": true + }) + .to_string(); + let event = ffi_flag_evaluation(&raw); + + let converted = ffe_flag_evaluation_from_ffi(&event, "checkout").unwrap(); + let context = converted.context.expect("context must be present"); + let evaluation = context.evaluation.expect("evaluation context must remain"); + let value: serde_json::Value = serde_json::from_str(&evaluation).unwrap(); + + assert_eq!(value["a"]["b"]["c"]["d"], "kept"); + assert!(value["a"]["b"]["c"].get("too_deep").is_none()); + assert_eq!(value["top"], true); + } } diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index b3f8e4b28b..ecb7ae151d 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -29,7 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" } libdd-trace-stats = { path = "../libdd-trace-stats", features = ["telemetry", "dogstatsd"] } libdd-remote-config = { path = "../libdd-remote-config" } datadog-live-debugger = { path = "../datadog-live-debugger" } -datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] } +datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics", "flagevaluation-evp"] } libdd-crashtracker = { path = "../libdd-crashtracker" } libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" } libdd-tinybytes = { path = "../libdd-tinybytes" } diff --git a/datadog-sidecar/src/self_telemetry.rs b/datadog-sidecar/src/self_telemetry.rs index 14f94ebf7e..9d08faa76e 100644 --- a/datadog-sidecar/src/self_telemetry.rs +++ b/datadog-sidecar/src/self_telemetry.rs @@ -2,6 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use crate::config::Config; use crate::log; +use crate::service::ffe_flagevaluation_flusher::{ + FLAG_EVALUATION_DEGRADED_EVALUATIONS_METRIC, FLAG_EVALUATION_DROPPED_EVALUATIONS_METRIC, + FLAG_EVALUATION_PAYLOAD_SPLITS_METRIC, FLAG_EVALUATION_REASON_CARDINALITY_CAP, + FLAG_EVALUATION_REASON_DEGRADED_CAP, FLAG_EVALUATION_REASON_PAYLOAD_LIMIT, +}; use crate::service::SidecarServer; use crate::watchdog::WatchdogHandle; use libdd_common::{tag, tag::Tag, MutexExt}; @@ -31,6 +36,9 @@ struct MetricData<'a> { trace_api_bytes: ContextKey, trace_chunks_sent: ContextKey, trace_chunks_dropped: ContextKey, + flagevaluation_evaluations_dropped: ContextKey, + flagevaluation_evaluations_degraded: ContextKey, + flagevaluation_payload_splits: ContextKey, } impl MetricData<'_> { async fn send(&self, key: ContextKey, value: f64, tags: Vec) { @@ -42,6 +50,10 @@ impl MetricData<'_> { async fn collect_and_send(&self) { let trace_metrics = self.server.trace_flusher.collect_metrics(); + let flagevaluation_writer_stats = self + .server + .ffe_flagevaluation_coalescer + .collect_writer_stats(); let submitted_payloads_delta = { let mut counters = self.server.connection_counters.lock_or_panic(); @@ -154,6 +166,41 @@ impl MetricData<'_> { ], )); } + if flagevaluation_writer_stats.rows_dropped_degraded_cap > 0 { + futures.push(self.send( + self.flagevaluation_evaluations_dropped, + flagevaluation_writer_stats.rows_dropped_degraded_cap as f64, + vec![tag!("reason", FLAG_EVALUATION_REASON_DEGRADED_CAP)], + )); + } + if flagevaluation_writer_stats.rows_dropped_payload_limit > 0 { + futures.push(self.send( + self.flagevaluation_evaluations_dropped, + flagevaluation_writer_stats.rows_dropped_payload_limit as f64, + vec![tag!("reason", FLAG_EVALUATION_REASON_PAYLOAD_LIMIT)], + )); + } + if flagevaluation_writer_stats.rows_degraded_cardinality_cap > 0 { + futures.push(self.send( + self.flagevaluation_evaluations_degraded, + flagevaluation_writer_stats.rows_degraded_cardinality_cap as f64, + vec![tag!("reason", FLAG_EVALUATION_REASON_CARDINALITY_CAP)], + )); + } + if flagevaluation_writer_stats.rows_degraded_payload_limit > 0 { + futures.push(self.send( + self.flagevaluation_evaluations_degraded, + flagevaluation_writer_stats.rows_degraded_payload_limit as f64, + vec![tag!("reason", FLAG_EVALUATION_REASON_PAYLOAD_LIMIT)], + )); + } + if flagevaluation_writer_stats.payload_splits > 0 { + futures.push(self.send( + self.flagevaluation_payload_splits, + flagevaluation_writer_stats.payload_splits as f64, + vec![], + )); + } futures::future::join_all(futures).await; } @@ -282,6 +329,27 @@ impl SelfTelemetry { true, MetricNamespace::Tracers, ), + flagevaluation_evaluations_dropped: worker.register_metric_context( + FLAG_EVALUATION_DROPPED_EVALUATIONS_METRIC.to_string(), + vec![], + MetricType::Count, + true, + MetricNamespace::Tracers, + ), + flagevaluation_evaluations_degraded: worker.register_metric_context( + FLAG_EVALUATION_DEGRADED_EVALUATIONS_METRIC.to_string(), + vec![], + MetricType::Count, + true, + MetricNamespace::Tracers, + ), + flagevaluation_payload_splits: worker.register_metric_context( + FLAG_EVALUATION_PAYLOAD_SPLITS_METRIC.to_string(), + vec![], + MetricType::Count, + true, + MetricNamespace::Tracers, + ), }; let _ = worker diff --git a/datadog-sidecar/src/service/evp_proxy.rs b/datadog-sidecar/src/service/evp_proxy.rs new file mode 100644 index 0000000000..7be31fa2e1 --- /dev/null +++ b/datadog-sidecar/src/service/evp_proxy.rs @@ -0,0 +1,10 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Shared EVP proxy constants for sidecar services. + +/// EVP subdomain header name. +pub(crate) const SUBDOMAIN_HEADER: &str = "X-Datadog-EVP-Subdomain"; + +/// EVP subdomain that routes requests to event-platform intake. +pub(crate) const EVENT_PLATFORM_INTAKE_SUBDOMAIN: &str = "event-platform-intake"; diff --git a/datadog-sidecar/src/service/ffe_evp_proxy.rs b/datadog-sidecar/src/service/ffe_evp_proxy.rs new file mode 100644 index 0000000000..88e4a4c330 --- /dev/null +++ b/datadog-sidecar/src/service/ffe_evp_proxy.rs @@ -0,0 +1,131 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Shared FFE EVP proxy transport helpers. +//! +//! FFE exposures and flagevaluation use different payload schemas, but both +//! are forwarded through the Agent EVP proxy with the same endpoint derivation, +//! subdomain header, timeout behavior, and fire-and-forget error handling. + +use crate::service::evp_proxy; +use http::uri::PathAndQuery; +use http::Method; +use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +use libdd_common::Endpoint; +use std::time::Duration; +use tracing::{debug, warn}; + +pub(crate) use evp_proxy::EVENT_PLATFORM_INTAKE_SUBDOMAIN as EVP_SUBDOMAIN_VALUE; +pub(crate) use evp_proxy::SUBDOMAIN_HEADER as EVP_SUBDOMAIN_HEADER; + +const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); + +/// Build an Agent EVP proxy endpoint from a session's agent base endpoint. +/// Overrides only the path, preserving scheme, authority, timeout, and test_token. +/// Returns `None` for agentless mode because EVP proxy routing is agent-only. +pub(crate) fn endpoint(base: &Endpoint, path: &'static str) -> Option { + if base.api_key.is_some() { + return None; + } + + let mut parts = base.url.clone().into_parts(); + parts.path_and_query = Some(PathAndQuery::from_static(path)); + let url = http::Uri::from_parts(parts).ok()?; + Some(Endpoint { + url, + ..base.clone() + }) +} + +/// POST a JSON payload to the Agent EVP proxy. +/// +/// Fire-and-forget: non-2xx responses are logged at `warn`, network errors at +/// `debug`, and dropped. +pub(crate) async fn send_payload( + client: &C, + endpoint: &Endpoint, + payload: String, + log_prefix: &'static str, + success_name: &'static str, +) { + let builder = match endpoint.to_request_builder(USER_AGENT) { + Ok(b) => b, + Err(e) => { + debug!("{log_prefix}: failed to build request: {e:?}"); + return; + } + }; + + let req = match builder + .method(Method::POST) + .header("Content-Type", "application/json") + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .body(Bytes::from(payload)) + { + Ok(r) => r, + Err(e) => { + debug!("{log_prefix}: failed to construct request body: {e:?}"); + return; + } + }; + + let timeout = Duration::from_millis(endpoint.timeout_ms); + let response = tokio::select! { + biased; + result = client.request(req) => result, + _ = client.sleep(timeout) => { + debug!("{log_prefix}: request timed out after {timeout:?}"); + return; + } + }; + + match response { + Ok(resp) => { + let status = resp.status(); + if !status.is_success() { + let body_preview = truncate(resp.body().as_ref(), 256); + warn!("{log_prefix}: non-2xx response {status}: {body_preview}"); + } else { + debug!("{log_prefix}: sent {success_name}, status={status}"); + } + } + Err(e) => { + debug!("{log_prefix}: request failed: {e:?}"); + } + } +} + +fn truncate(bytes: &[u8], cap: usize) -> String { + let take = bytes.len().min(cap); + String::from_utf8_lossy(&bytes[..take]).into_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn endpoint_preserves_authority_overrides_path() { + let base = Endpoint { + url: "http://agent.internal:8126/v0.4/traces".parse().unwrap(), + ..Endpoint::default() + }; + let ep = endpoint(&base, "/evp_proxy/v2/api/v2/exposures").unwrap(); + assert_eq!(ep.url.scheme_str(), Some("http")); + assert_eq!(ep.url.authority().unwrap().as_str(), "agent.internal:8126"); + assert_eq!(ep.url.path(), "/evp_proxy/v2/api/v2/exposures"); + } + + #[test] + fn endpoint_rejects_agentless() { + let base = Endpoint { + url: "https://trace.agent.datadoghq.com/v0.4/traces" + .parse() + .unwrap(), + api_key: Some("api-key".into()), + ..Endpoint::default() + }; + + assert!(endpoint(&base, "/evp_proxy/v2/api/v2/exposures").is_none()); + } +} diff --git a/datadog-sidecar/src/service/ffe_exposures_flusher.rs b/datadog-sidecar/src/service/ffe_exposures_flusher.rs index 996802694d..559536786f 100644 --- a/datadog-sidecar/src/service/ffe_exposures_flusher.rs +++ b/datadog-sidecar/src/service/ffe_exposures_flusher.rs @@ -8,41 +8,27 @@ //! dd-trace-dotnet: `POST /evp_proxy/v2/api/v2/exposures` with the header //! `X-Datadog-EVP-Subdomain: event-platform-intake`. No agent capability gate. +use crate::service::ffe_evp_proxy; use crate::service::FfeExposureBatch; use datadog_ffe::telemetry::exposures::encode_exposure_batch; pub(crate) use datadog_ffe::telemetry::exposures::ExposureDeduplicator; -use http::uri::PathAndQuery; -use http::Method; -use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +#[cfg(test)] +use ffe_evp_proxy::{EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE}; +use libdd_capabilities::{HttpClientCapability, SleepCapability}; use libdd_common::Endpoint; -use std::time::Duration; -use tracing::{debug, warn}; +use tracing::debug; /// EVP proxy path for FFE exposure intake. pub(crate) const EVP_EXPOSURES_PATH: &str = "/evp_proxy/v2/api/v2/exposures"; -/// EVP subdomain that routes requests to event-platform intake. -pub(crate) const EVP_SUBDOMAIN_HEADER: &str = "X-Datadog-EVP-Subdomain"; -pub(crate) const EVP_SUBDOMAIN_VALUE: &str = "event-platform-intake"; - -const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); +const LOG_PREFIX: &str = "ffe_exposures_flusher"; /// Build the FFE exposure endpoint from a session's agent base endpoint. /// Overrides only the path (`/evp_proxy/v2/api/v2/exposures`), preserving /// scheme, authority, timeout, and test_token. /// Returns `None` for agentless mode because EVP proxy routing is agent-only. pub(crate) fn exposure_endpoint(base: &Endpoint) -> Option { - if base.api_key.is_some() { - return None; - } - - let mut parts = base.url.clone().into_parts(); - parts.path_and_query = Some(PathAndQuery::from_static(EVP_EXPOSURES_PATH)); - let url = http::Uri::from_parts(parts).ok()?; - Some(Endpoint { - url, - ..base.clone() - }) + ffe_evp_proxy::endpoint(base, EVP_EXPOSURES_PATH) } /// POST a structured FFE exposure batch to the agent EVP proxy. @@ -62,64 +48,7 @@ pub(crate) async fn send_batch( return; } }; - send_payload(client, endpoint, payload).await; -} - -async fn send_payload( - client: &C, - endpoint: &Endpoint, - payload: String, -) { - let builder = match endpoint.to_request_builder(USER_AGENT) { - Ok(b) => b, - Err(e) => { - debug!("ffe_exposures_flusher: failed to build request: {e:?}"); - return; - } - }; - - let req = match builder - .method(Method::POST) - .header("Content-Type", "application/json") - .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) - .body(Bytes::from(payload)) - { - Ok(r) => r, - Err(e) => { - debug!("ffe_exposures_flusher: failed to construct request body: {e:?}"); - return; - } - }; - - let timeout = Duration::from_millis(endpoint.timeout_ms); - let response = tokio::select! { - biased; - result = client.request(req) => result, - _ = client.sleep(timeout) => { - debug!("ffe_exposures_flusher: request timed out after {timeout:?}"); - return; - } - }; - - match response { - Ok(resp) => { - let status = resp.status(); - if !status.is_success() { - let body_preview = truncate(resp.body().as_ref(), 256); - warn!("ffe_exposures_flusher: non-2xx response {status}: {body_preview}"); - } else { - debug!("ffe_exposures_flusher: sent exposure batch, status={status}"); - } - } - Err(e) => { - debug!("ffe_exposures_flusher: request failed: {e:?}"); - } - } -} - -fn truncate(bytes: &[u8], cap: usize) -> String { - let take = bytes.len().min(cap); - String::from_utf8_lossy(&bytes[..take]).into_owned() + ffe_evp_proxy::send_payload(client, endpoint, payload, LOG_PREFIX, "exposure batch").await; } #[cfg(test)] @@ -127,9 +56,10 @@ mod tests { use super::*; use crate::service::{FfeExposure, FfeTelemetryContext}; use httpmock::MockServer; - use libdd_capabilities::{HttpError, MaybeSend}; + use libdd_capabilities::{Bytes, HttpError, MaybeSend}; use libdd_capabilities_impl::NativeCapabilities; use std::future; + use std::time::Duration; fn endpoint_for(server: &MockServer) -> Endpoint { Endpoint { diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs new file mode 100644 index 0000000000..fdf9e808c1 --- /dev/null +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -0,0 +1,306 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Coalesces sidecar FFE (Feature Flag Evaluation) flag evaluation batches and +//! dispatches them through the shared `datadog-ffe` EVP sender. +//! +//! Protocol: `POST /evp_proxy/v2/api/v2/flagevaluation` with the header +//! `X-Datadog-EVP-Subdomain: event-platform-intake`. Fire-and-forget: non-2xx +//! responses and network errors are logged at `debug` and dropped +//! (matches dd-trace-go behaviour). No agent capability gate. + +use crate::service::{FfeFlagEvaluationBatch, FfeTelemetryContext}; +use datadog_ffe::telemetry::flagevaluation::{ + flagevaluation_agent_proxy_endpoint, send_flag_evaluation_batch, + FlagEvaluationEvpCoalescer as CommonFlagEvaluationEvpCoalescer, FlagEvaluationEvpSendConfig, + FlagEvaluationEvpWriterStats, +}; +use libdd_capabilities_impl::NativeCapabilities; +use libdd_common::Endpoint; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex as AsyncMutex; + +const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); +const COALESCE_DELAY: Duration = Duration::from_millis(250); + +pub(crate) const FLAG_EVALUATION_DROPPED_EVALUATIONS_METRIC: &str = + "flagevaluation.evaluations.dropped"; +pub(crate) const FLAG_EVALUATION_DEGRADED_EVALUATIONS_METRIC: &str = + "flagevaluation.evaluations.degraded"; +pub(crate) const FLAG_EVALUATION_PAYLOAD_SPLITS_METRIC: &str = "flagevaluation.payload.splits"; + +pub(crate) const FLAG_EVALUATION_REASON_DEGRADED_CAP: &str = "degraded_cap"; +pub(crate) const FLAG_EVALUATION_REASON_CARDINALITY_CAP: &str = "cardinality_cap"; +pub(crate) const FLAG_EVALUATION_REASON_PAYLOAD_LIMIT: &str = "payload_limit"; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct DestinationKey { + endpoint: Endpoint, + context: FfeTelemetryContext, +} + +impl DestinationKey { + fn new(endpoint: Endpoint, context: &FfeTelemetryContext) -> Self { + Self { + endpoint, + context: context.clone(), + } + } +} + +#[derive(Clone, Default)] +pub(crate) struct FlagEvaluationCoalescer { + inner: CommonFlagEvaluationEvpCoalescer, + flush_mutex: Arc>, +} + +impl FlagEvaluationCoalescer { + pub(crate) fn enqueue( + &self, + client: NativeCapabilities, + endpoint: Endpoint, + batch: FfeFlagEvaluationBatch, + ) { + let destination_key = DestinationKey::new(endpoint, &batch.context); + if self.inner.enqueue(destination_key, batch) { + let coalescer = self.clone(); + tokio::spawn(async move { + coalescer.flush_loop(client).await; + }); + } + } + + pub(crate) async fn flush_now(&self, client: NativeCapabilities) { + let _guard = self.flush_mutex.lock().await; + self.flush_available_batches(client).await; + } + + async fn flush_available_batches(&self, client: NativeCapabilities) { + let batches = self.inner.take_batches(); + futures::future::join_all(batches.into_iter().map(|(destination, batch)| { + let client = client.clone(); + let coalescer = self.inner.clone(); + async move { + send_batch_with_writer_stats(&client, &destination.endpoint, batch, &coalescer) + .await + } + })) + .await; + } + + async fn flush_loop(self, client: NativeCapabilities) { + loop { + tokio::time::sleep(COALESCE_DELAY).await; + { + let _guard = self.flush_mutex.lock().await; + self.flush_available_batches(client.clone()).await; + } + + if self.inner.finish_flush_cycle() { + break; + } + } + } + + pub(crate) fn collect_writer_stats(&self) -> FlagEvaluationEvpWriterStats { + self.inner.collect_writer_stats() + } +} + +/// Build the FFE flagevaluation endpoint from a session's agent base endpoint. +/// Overrides only the path (`/evp_proxy/v2/api/v2/flagevaluation`), preserving +/// scheme, authority, timeout, and test_token. +/// Returns `None` for agentless mode because EVP proxy routing is agent-only. +pub(crate) fn flagevaluation_endpoint(base: &Endpoint) -> Option { + flagevaluation_agent_proxy_endpoint(base) +} + +async fn send_batch_with_writer_stats( + client: &NativeCapabilities, + endpoint: &Endpoint, + batch: FfeFlagEvaluationBatch, + coalescer: &CommonFlagEvaluationEvpCoalescer, +) { + let config = FlagEvaluationEvpSendConfig::new(USER_AGENT); + if let Some(result) = send_flag_evaluation_batch(client, endpoint, batch, &config).await { + coalescer.record_payload_build_result(&result); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::service::{FfeFlagEvaluationBatch, FfeTelemetryContext}; + use datadog_ffe::telemetry::flagevaluation::{ + FfeFlagEvaluationEvent, FlagEvalEventContext, FlagKey, EVP_FLAGEVALUATION_PATH, + }; + use httpmock::MockServer; + use libdd_capabilities::HttpClientCapability; + use libdd_capabilities_impl::NativeCapabilities; + use std::collections::BTreeMap; + + fn endpoint_for(server: &MockServer) -> Endpoint { + Endpoint { + url: server.url("/").parse().unwrap(), + ..Endpoint::default() + } + } + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn eval_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 5, + variant: None, + allocation: None, + targeting_rule: None, + targeting_key: None, + // `evaluation` is carried as a JSON-object STRING on the wire (bincode + // can't carry serde_json::Value); the flusher re-expands it to an object. + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("country".to_owned(), serde_json::json!("US")); + m + }) + .unwrap(), + ), + dd: None, + }), + error: None, + runtime_default_used: false, + } + } + + fn batch() -> FfeFlagEvaluationBatch { + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![eval_event()], + } + } + + #[test] + fn self_telemetry_metric_names_describe_evaluation_count_units() { + assert_eq!( + FLAG_EVALUATION_DROPPED_EVALUATIONS_METRIC, + "flagevaluation.evaluations.dropped" + ); + assert_eq!( + FLAG_EVALUATION_DEGRADED_EVALUATIONS_METRIC, + "flagevaluation.evaluations.degraded" + ); + assert_eq!( + FLAG_EVALUATION_PAYLOAD_SPLITS_METRIC, + "flagevaluation.payload.splits" + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn coalesces_identical_batches_before_posting() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATION_PATH) + .body_includes("\"evaluation_count\":10"); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + let coalescer = FlagEvaluationCoalescer::default(); + + coalescer.enqueue(client.clone(), ep.clone(), batch()); + coalescer.enqueue(client.clone(), ep, batch()); + + for _ in 0..100 { + if mock.calls_async().await == 1 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + + mock.assert_calls_async(1).await; + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn flush_now_waits_for_in_flight_flush_section() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATION_PATH); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + let coalescer = FlagEvaluationCoalescer::default(); + let guard = coalescer.flush_mutex.lock().await; + + coalescer.enqueue(client.clone(), ep, batch()); + + let mut flush = tokio::spawn({ + let coalescer = coalescer.clone(); + async move { + coalescer.flush_now(client).await; + } + }); + + assert!( + tokio::time::timeout(Duration::from_millis(50), &mut flush) + .await + .is_err(), + "flush_now returned while another FFE flush section was in flight" + ); + assert_eq!(mock.calls_async().await, 0); + + drop(guard); + flush.await.unwrap(); + mock.assert_calls_async(1).await; + } + + #[test] + fn endpoint_preserves_authority_overrides_path() { + let base = Endpoint { + url: "http://agent.internal:8126/v0.4/traces".parse().unwrap(), + ..Endpoint::default() + }; + let ep = flagevaluation_endpoint(&base).unwrap(); + assert_eq!(ep.url.scheme_str(), Some("http")); + assert_eq!(ep.url.authority().unwrap().as_str(), "agent.internal:8126"); + assert_eq!(ep.url.path(), EVP_FLAGEVALUATION_PATH); + } + + #[test] + fn endpoint_rejects_agentless() { + let base = Endpoint { + url: "https://trace.agent.datadoghq.com/v0.4/traces" + .parse() + .unwrap(), + api_key: Some("api-key".into()), + ..Endpoint::default() + }; + assert!(flagevaluation_endpoint(&base).is_none()); + } +} diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index a7019781ff..4f884ca09e 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -5,6 +5,11 @@ use crate::config; pub use datadog_ffe::telemetry::evaluation_metrics::FfeEvaluationMetric; pub use datadog_ffe::telemetry::exposures::{FfeExposure, FfeExposureBatch}; +pub use datadog_ffe::telemetry::flagevaluation::{ + AllocationKey, ContextDD, EvalError, FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, + FlagEvalEventContext, FlagKey, TargetingRuleKey, VariantKey, MAX_CONTEXT_DEPTH, + MAX_CONTEXT_FIELDS, MAX_FIELD_LENGTH, +}; pub use datadog_ffe::telemetry::FfeTelemetryContext; use libdd_common::tag::Tag; use libdd_common::Endpoint; @@ -30,8 +35,11 @@ pub(crate) use sidecar_interface::SidecarInterface; pub mod agent_info; pub mod blocking; mod debugger_diagnostics_bookkeeper; +pub(crate) mod evp_proxy; pub mod exception_hash_rate_limiter; +pub(crate) mod ffe_evp_proxy; pub(crate) mod ffe_exposures_flusher; +pub(crate) mod ffe_flagevaluation_flusher; pub(crate) mod ffe_metrics_flusher; mod instance_id; mod queue_id; @@ -100,4 +108,12 @@ pub enum SidecarAction { context: FfeTelemetryContext, metrics: Vec, }, + /// Structured FFE flag evaluation batch for the EVP flagevaluation track. + /// The sidecar serializes and POSTs the batch to + /// `/evp_proxy/v2/api/v2/flagevaluation` (fire-and-forget). + /// + /// Keep this appended after pre-existing variants: this enum crosses the + /// bincode sidecar IPC boundary, so inserting a variant before existing + /// variants changes their wire ordinals. + FfeFlagEvaluationBatch(FfeFlagEvaluationBatch), } diff --git a/datadog-sidecar/src/service/sidecar_interface.rs b/datadog-sidecar/src/service/sidecar_interface.rs index a58c6fcbde..c1d232efcf 100644 --- a/datadog-sidecar/src/service/sidecar_interface.rs +++ b/datadog-sidecar/src/service/sidecar_interface.rs @@ -26,6 +26,7 @@ pub enum DynamicInstrumentationConfigState { #[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] pub struct SidecarFlushOptions { pub traces_and_stats: bool, + pub flag_evaluations: bool, pub telemetry: bool, } @@ -220,7 +221,8 @@ pub trait SidecarInterface { /// * `actions` - The DogStatsD actions to send. async fn send_dogstatsd_actions(instance_id: InstanceId, actions: Vec); - /// Flushes outstanding traces/stats and/or telemetry, as specified by options. + /// Flushes outstanding traces/stats, flag evaluations, and/or telemetry, as specified by + /// options. #[blocking] async fn flush(options: SidecarFlushOptions); diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index ea16ef1d6c..071fd36f24 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -37,6 +37,7 @@ use crate::service::debugger_diagnostics_bookkeeper::{ }; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; use crate::service::ffe_exposures_flusher; +use crate::service::ffe_flagevaluation_flusher; use crate::service::ffe_metrics_flusher; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::stats_flusher::{ @@ -117,6 +118,8 @@ pub struct SidecarServer { pub(crate) ffe_http_client: NativeCapabilities, /// Sidecar-owned exposure cache, shared across sessions/connections. pub(crate) ffe_exposure_deduplicator: ffe_exposures_flusher::ExposureDeduplicator, + /// Sidecar-owned EVP flagevaluation coalescer, shared across PHP request lifetimes. + pub(crate) ffe_flagevaluation_coalescer: ffe_flagevaluation_flusher::FlagEvaluationCoalescer, } /// Per-connection handler wrapper that tracks sessions/instances for cleanup on disconnect. @@ -446,6 +449,27 @@ impl SidecarInterface for ConnectionSidecarHandler { } false } + SidecarAction::FfeFlagEvaluationBatch(batch) => { + if let Some(base) = trace_config.endpoint.as_ref() { + if let Some(ep) = ffe_flagevaluation_flusher::flagevaluation_endpoint(base) + { + self.server.ffe_flagevaluation_coalescer.enqueue( + ffe_http_client.clone(), + ep, + batch.clone(), + ); + } else { + debug!( + "ffe_flagevaluation_flusher: could not derive endpoint, dropping batch" + ); + } + } else { + debug!( + "ffe_flagevaluation_flusher: no session endpoint, dropping batch" + ); + } + false + } SidecarAction::FfeEvaluationMetrics { context, metrics } => { if let Some(ep) = session.get_otlp_metrics_endpoint().clone() { let client = ffe_http_client.clone(); @@ -1083,14 +1107,30 @@ impl SidecarInterface for ConnectionSidecarHandler { } async fn flush(&self, _peer: PeerCredentials, options: SidecarFlushOptions) { - if options.traces_and_stats { - let flusher = self.server.trace_flusher.clone(); - if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await { - error!("Failed flushing traces: {e:?}"); + let flag_evaluations = options.flag_evaluations; + let ffe_coalescer = self.server.ffe_flagevaluation_coalescer.clone(); + let ffe_http_client = self.server.ffe_http_client.clone(); + let flush_flag_evaluations = async move { + if flag_evaluations { + ffe_coalescer.flush_now(ffe_http_client).await; } - flush_all_stats_now(&self.server.span_concentrators).await; - debug!("Finished executing flush() for traces and stats") - } + }; + + let traces_and_stats = options.traces_and_stats; + let flusher = self.server.trace_flusher.clone(); + let span_concentrators = self.server.span_concentrators.clone(); + let flush_traces_and_stats = async move { + if traces_and_stats { + if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await { + error!("Failed flushing traces: {e:?}"); + } + flush_all_stats_now(&span_concentrators).await; + debug!("Finished executing flush() for traces and stats") + } + }; + + tokio::join!(flush_flag_evaluations, flush_traces_and_stats); + if options.telemetry { let workers: Vec<_> = { let clients = self.server.telemetry_clients.inner.lock_or_panic(); @@ -1173,7 +1213,11 @@ impl SidecarInterface for ConnectionSidecarHandler { #[cfg(test)] mod tests { use super::*; - use crate::service::{FfeEvaluationMetric, FfeExposure, FfeExposureBatch, FfeTelemetryContext}; + use crate::service::{ + FfeEvaluationMetric, FfeExposure, FfeExposureBatch, FfeFlagEvaluationBatch, + FfeFlagEvaluationEvent, FfeTelemetryContext, FlagKey, + }; + use datadog_ffe::telemetry::flagevaluation::EVP_FLAGEVALUATION_PATH; use httpmock::{Method::POST, MockServer}; use tokio::time::{sleep, Duration as TokioDuration}; @@ -1206,6 +1250,28 @@ mod tests { } } + fn ffe_flag_evaluation_batch() -> FfeFlagEvaluationBatch { + FfeFlagEvaluationBatch { + context: ffe_context(), + flag_evaluations: vec![FfeFlagEvaluationEvent { + timestamp: 123, + flag: FlagKey { + key: "flag".to_owned(), + }, + first_evaluation: 100, + last_evaluation: 123, + evaluation_count: 1, + variant: None, + allocation: None, + targeting_rule: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: false, + }], + } + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn ffe_exposure_actions_dispatch_without_registered_application() { @@ -1328,6 +1394,62 @@ mod tests { .contains_key(&queue_id)); } + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn flush_options_control_ffe_flag_evaluations() { + let http_server = MockServer::start_async().await; + let flag_evaluations_mock = http_server + .mock_async(|when, then| { + when.method(POST).path(EVP_FLAGEVALUATION_PATH); + then.status(202); + }) + .await; + + let handler = ConnectionSidecarHandler::new(SidecarServer::default()); + let instance_id = InstanceId::new("session", "runtime"); + let queue_id = QueueId::from(42); + + handler + .server + .get_session(&instance_id.session_id) + .modify_trace_config(|cfg| { + let endpoint = Endpoint { + url: http_server.url("/").parse().unwrap(), + ..Endpoint::default() + }; + cfg.set_endpoint(endpoint).unwrap(); + }); + + handler + .enqueue_actions( + PeerCredentials::default(), + instance_id, + queue_id, + vec![SidecarAction::FfeFlagEvaluationBatch( + ffe_flag_evaluation_batch(), + )], + ) + .await; + + handler + .flush(PeerCredentials::default(), SidecarFlushOptions::default()) + .await; + sleep(TokioDuration::from_millis(50)).await; + assert_eq!(flag_evaluations_mock.calls_async().await, 0); + + handler + .flush( + PeerCredentials::default(), + SidecarFlushOptions { + flag_evaluations: true, + ..SidecarFlushOptions::default() + }, + ) + .await; + + flag_evaluations_mock.assert_calls_async(1).await; + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn registered_sdk_without_ffe_actions_does_not_emit_ffe_telemetry() { diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 3e29f1c95d..134903a155 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -455,6 +455,7 @@ impl TelemetryCachedClient { } SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately SidecarAction::FfeExposureBatch(_) => {} // handled in sidecar_server + SidecarAction::FfeFlagEvaluationBatch(_) => {} // handled in sidecar_server SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server } }