From 06d7629db7e751bc08b40be40ecd302b7b2aff92 Mon Sep 17 00:00:00 2001 From: vianney Date: Wed, 24 Jun 2026 18:32:03 +0200 Subject: [PATCH 1/2] feat(trace_exporter): enable telemetry in stats exporter --- libdd-data-pipeline-ffi/src/trace_exporter.rs | 47 +++++++++++++++++++ libdd-data-pipeline/src/telemetry/mod.rs | 5 ++ .../src/trace_exporter/builder.rs | 22 +++++++-- libdd-data-pipeline/src/trace_exporter/mod.rs | 14 ++++-- .../src/trace_exporter/stats.rs | 13 +++-- 5 files changed, 91 insertions(+), 10 deletions(-) diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index f3173b9410..c3a68962a4 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -90,6 +90,7 @@ pub struct TraceExporterConfig { otlp_protocol: Option, output_to_log: bool, log_max_line_size: Option, + stats_cardinality_limit: Option, } #[no_mangle] @@ -543,6 +544,30 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( ) } +/// Sets the cardinality limit for client-side stats computation. +/// +/// When the number of distinct stats groups exceeds `limit`, additional groups are +/// aggregated into a sentinel key instead of being tracked individually. +/// This bounds memory usage when the trace population has very high cardinality. +/// +/// Has no effect unless stats computation is enabled via +/// `ddog_trace_exporter_config_set_compute_stats`. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_stats_cardinality_limit( + config: Option<&mut TraceExporterConfig>, + limit: usize, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + handle.stats_cardinality_limit = Some(limit); + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog /// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless /// environments (e.g. AWS Lambda) when no agent is reachable. @@ -620,6 +645,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( .set_output_format(config.output_format) .set_connection_timeout(config.connection_timeout); + if let Some(limit) = config.stats_cardinality_limit { + builder.set_stats_cardinality_limit(limit); + } + if config.compute_stats { builder.enable_stats(Duration::from_secs(10)); } else if config.client_computed_stats { @@ -753,6 +782,7 @@ mod tests { assert!(cfg.connection_timeout.is_none()); assert!(!cfg.output_to_log); assert_eq!(cfg.log_max_line_size, None); + assert_eq!(cfg.stats_cardinality_limit, None); ddog_trace_exporter_config_free(cfg); } @@ -1467,6 +1497,23 @@ mod tests { assert_eq!(ret.unwrap().code, ErrorCode::Panic); } + #[test] + fn config_stats_cardinality_limit_test() { + unsafe { + // Null config → InvalidArgument + let error = ddog_trace_exporter_config_set_stats_cardinality_limit(None, 100); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // Valid config → value stored + let mut config = Some(TraceExporterConfig::default()); + let error = + ddog_trace_exporter_config_set_stats_cardinality_limit(config.as_mut(), 500); + assert_eq!(error, None); + assert_eq!(config.unwrap().stats_cardinality_limit, Some(500)); + } + } + #[test] fn config_health_metrics_test() { unsafe { diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 10649f87ab..abf2cea494 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -325,6 +325,11 @@ impl TelemetryClient { .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await; } + + /// Clone the telemetry handle + pub fn clone_handle(&self) -> TelemetryWorkerHandle { + self.worker.clone() + } } #[cfg(test)] diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 9c2970a475..5582c934cd 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -75,6 +75,7 @@ pub struct TraceExporterBuilder { peer_tags_aggregation: bool, compute_stats_by_span_kind: bool, peer_tags: Vec, + stats_cardinality_limit: Option, #[cfg(feature = "stats-obfuscation")] client_side_stats_obfuscation_enabled: bool, #[cfg(feature = "telemetry")] @@ -142,6 +143,7 @@ impl TraceExporterBuilder { peer_tags_aggregation: false, compute_stats_by_span_kind: false, peer_tags: Vec::new(), + stats_cardinality_limit: None, #[cfg(feature = "stats-obfuscation")] client_side_stats_obfuscation_enabled: false, #[cfg(feature = "telemetry")] @@ -324,6 +326,18 @@ impl TraceExporterBuilder { self } + /// Sets the cardinality limit for client-side stats computation. + /// + /// When the number of distinct stats groups exceeds `limit`, additional groups are + /// aggregated into a sentinel key instead of being tracked individually. + /// This bounds memory usage when the trace population has very high cardinality. + /// + /// Has no effect unless stats computation is enabled. + pub fn set_stats_cardinality_limit(&mut self, cardinality_limit: usize) -> &mut Self { + self.stats_cardinality_limit = Some(cardinality_limit); + self + } + /// Enable client-side stats obfuscation. Disabled by default. /// /// Final activation also requires the agent to advertise a supported @@ -561,10 +575,9 @@ impl TraceExporterBuilder { })?), }; - let dogstatsd = self.dogstatsd_url.and_then(|u| { - new(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return - // None - }); + let dogstatsd = self + .dogstatsd_url + .and_then(|u| new(Endpoint::from_slice(&u)).ok().map(Arc::new)); let base_url = self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL); @@ -807,6 +820,7 @@ impl TraceExporterBuilder { common_stats_tags: vec![libdatadog_version], client_side_stats: StatsComputationConfig { status: ArcSwap::new(stats.into()), + stats_cardinality_limit: self.stats_cardinality_limit, #[cfg(feature = "stats-obfuscation")] obfuscation_config: Arc::new(ArcSwap::from_pointee( StatsComputationObfuscationConfig::default(), diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 52e42dcd67..44ead972e1 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -253,7 +253,7 @@ pub struct TraceExporter< serializer: TraceSerializer, shared_runtime: Arc, /// None if dogstatsd is disabled - dogstatsd: Option, + dogstatsd: Option>, common_stats_tags: Vec, client_computed_top_level: bool, client_side_stats: StatsComputationConfig, @@ -469,6 +469,14 @@ impl< metadata: &self.metadata, endpoint_url: &self.endpoint.url, shared_runtime: &*self.shared_runtime, + stats_cardinality_limit: self.client_side_stats.stats_cardinality_limit, + dogstatsd: if self.health_metrics_enabled { + self.dogstatsd.clone() + } else { + None + }, + #[cfg(feature = "telemetry")] + telemetry: self.telemetry.as_ref().map(|t| t.clone_handle()), }; stats::handle_stats_disabled_by_agent( &ctx, @@ -559,7 +567,7 @@ impl< /// Emit a health metric to dogstatsd fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { if self.health_metrics_enabled { - let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags); + let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags); emitter.emit(metric, custom_tags); } } @@ -567,7 +575,7 @@ impl< /// Emit all health metrics from a SendResult fn emit_send_result(&self, result: &SendResult) { if self.health_metrics_enabled { - let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags); + let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags); emitter.emit_from_send_result(result); } } diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 81996006d6..2fdaa29728 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -49,6 +49,12 @@ pub(crate) struct StatsContext<'a, R: SharedRuntime> { pub metadata: &'a TracerMetadata, pub endpoint_url: &'a http::Uri, pub shared_runtime: &'a R, + pub stats_cardinality_limit: Option, + /// Optional DogStatsD client forwarded to the [`StatsExporter`]. + pub dogstatsd: Option>, + /// Optional telemetry handle forwarded to the [`StatsExporter`]. + #[cfg(feature = "telemetry")] + pub telemetry: Option, } #[derive(Debug)] @@ -71,6 +77,7 @@ pub(crate) enum StatsComputationStatus { #[cfg_attr(target_arch = "wasm32", allow(dead_code))] pub(crate) struct StatsComputationConfig { pub(crate) status: ArcSwap, + pub(crate) stats_cardinality_limit: Option, #[cfg(feature = "stats-obfuscation")] pub(crate) obfuscation_config: SharedStatsComputationObfuscationConfig, /// Builder-level opt-in. When false, stats obfuscation stays off @@ -135,7 +142,7 @@ pub(crate) fn start_stats_computation< std::time::SystemTime::now(), span_kinds, peer_tags, - None, + ctx.stats_cardinality_limit, #[cfg(feature = "stats-obfuscation")] Some(client_side_stats.obfuscation_config.clone()), ))); @@ -167,8 +174,8 @@ fn create_and_start_stats_worker< #[cfg(feature = "stats-obfuscation")] SUPPORTED_OBFUSCATION_VERSION_STR, #[cfg(feature = "telemetry")] - None, - None, + ctx.telemetry.clone(), + ctx.dogstatsd.clone(), ); let worker_handle = ctx .shared_runtime From 26b5e276cd5a0dc8e3919c43580a3662e21a598e Mon Sep 17 00:00:00 2001 From: vianney Date: Mon, 29 Jun 2026 16:16:01 +0200 Subject: [PATCH 2/2] feat(sidecar): enable telemetry for stats --- datadog-sidecar/src/service/sidecar_server.rs | 1 + datadog-sidecar/src/service/stats_flusher.rs | 64 +++++++++++++++++-- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index ea16ef1d6c..37b30235b5 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -1071,6 +1071,7 @@ impl SidecarInterface for ConnectionSidecarHandler { // Lazily create the concentrator on first IPC span for this (env, version, service). if let Some(state) = get_or_create_concentrator( &self.server.span_concentrators, + &self.server.telemetry_clients, &env, &version, session_id, diff --git a/datadog-sidecar/src/service/stats_flusher.rs b/datadog-sidecar/src/service/stats_flusher.rs index 13332bbb82..91e8d56d80 100644 --- a/datadog-sidecar/src/service/stats_flusher.rs +++ b/datadog-sidecar/src/service/stats_flusher.rs @@ -9,6 +9,7 @@ //! automatically once idle: an empty drain sets the `please_reload` bit (telling PHP workers //! to stop writing), and the subsequent flush performs a final drain before removal. +use crate::service::{InstanceId, RuntimeMetadata}; use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; use datadog_ipc::shm_stats::{ @@ -18,6 +19,8 @@ use futures::{future::join_all, TryFutureExt}; use http::uri::PathAndQuery; use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities}; use libdd_common::{Endpoint, MutexExt}; +use libdd_telemetry::config::Config; +use libdd_telemetry::worker::TelemetryWorkerHandle; use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata}; use std::collections::HashMap; use std::ffi::CString; @@ -104,6 +107,7 @@ fn make_exporter( s: &SpanConcentratorState, endpoint: Endpoint, flush_interval: Duration, + telemetry: Option, ) -> StatsExporter { StatsExporter::new( flush_interval, @@ -119,7 +123,7 @@ fn make_exporter( )), #[cfg(feature = "stats-obfuscation")] "0", - None, + telemetry, None, ) } @@ -135,6 +139,7 @@ pub async fn run_stats_flush_loop( states: Weak>>>, map_key: ConcentratorKey, flush_interval: Duration, + telemetry: Option, ) { let Some(arc) = states.upgrade() else { return; @@ -146,7 +151,13 @@ pub async fn run_stats_flush_loop( let Some(state) = state else { return; }; - let exporter = make_exporter(&state, state.endpoint.clone(), flush_interval); + + let exporter = make_exporter( + &state, + state.endpoint.clone(), + flush_interval, + telemetry.clone(), + ); loop { tokio::time::sleep(flush_interval).await; @@ -191,7 +202,11 @@ pub async fn run_stats_flush_loop( guard.remove(&map_key); } } - if let Err(e) = exporter.send(true).await { + if let Err(e) = + make_exporter(&state, state.endpoint.clone(), flush_interval, telemetry) + .send(true) + .await + { warn!("Failed final stats flush: {e}"); } break; @@ -210,6 +225,7 @@ pub async fn run_stats_flush_loop( /// Returns `None` when stats config is not available (agentless or not yet configured). pub(crate) fn get_or_create_concentrator( concentrators: &Arc>>>, + telemetry_clients: &crate::service::telemetry::TelemetryCachedClientSet, env: &str, version: &str, runtime_id: &str, @@ -270,8 +286,46 @@ pub(crate) fn get_or_create_concentrator( guard.insert(map_key.clone(), state.clone()); let weak = Arc::downgrade(concentrators); let flush_interval = config.flush_interval; + + let trace_config = session.get_trace_config(); + let runtime_metadata = RuntimeMetadata::new( + trace_config.language.clone(), + trace_config.language_version.clone(), + trace_config.tracer_version.clone(), + ); + drop(trace_config); + let process_tags = session.process_tags.lock_or_panic().clone(); + let instance_id = InstanceId { + session_id: runtime_id.to_owned(), + runtime_id: runtime_id.to_owned(), + }; + let telemetry = { + let telemetry_mutex = telemetry_clients.get_or_create( + &service_name, + env, + &instance_id, + &runtime_metadata, + || {session + .session_config + .lock_or_panic() + .as_ref() + .cloned() + .unwrap_or_else(|| { + warn!("Session telemetry config unavailable for env={env} version={version} service={service_name}; telemetry disabled in stats"); + Config::default() + }) +}, + process_tags, + ); + let worker = telemetry_mutex + .lock_or_panic() + .as_ref() + .map(|c| c.worker.clone()); + worker + }; + tokio::spawn(async move { - run_stats_flush_loop(weak, map_key, flush_interval).await; + run_stats_flush_loop(weak, map_key, flush_interval, telemetry).await; }); Some(state) } @@ -292,7 +346,7 @@ pub async fn flush_all_stats_now( .values() .map(|s| { ( - make_exporter(s, s.endpoint.clone(), Duration::from_secs(10)), + make_exporter(s, s.endpoint.clone(), Duration::from_secs(10), None), s.endpoint.clone(), ) })