diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index f3173b9410..c3a68962a4 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -90,6 +90,7 @@ pub struct TraceExporterConfig { otlp_protocol: Option, output_to_log: bool, log_max_line_size: Option, + stats_cardinality_limit: Option, } #[no_mangle] @@ -543,6 +544,30 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol( ) } +/// Sets the cardinality limit for client-side stats computation. +/// +/// When the number of distinct stats groups exceeds `limit`, additional groups are +/// aggregated into a sentinel key instead of being tracked individually. +/// This bounds memory usage when the trace population has very high cardinality. +/// +/// Has no effect unless stats computation is enabled via +/// `ddog_trace_exporter_config_set_compute_stats`. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_stats_cardinality_limit( + config: Option<&mut TraceExporterConfig>, + limit: usize, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + handle.stats_cardinality_limit = Some(limit); + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog /// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless /// environments (e.g. AWS Lambda) when no agent is reachable. @@ -620,6 +645,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( .set_output_format(config.output_format) .set_connection_timeout(config.connection_timeout); + if let Some(limit) = config.stats_cardinality_limit { + builder.set_stats_cardinality_limit(limit); + } + if config.compute_stats { builder.enable_stats(Duration::from_secs(10)); } else if config.client_computed_stats { @@ -753,6 +782,7 @@ mod tests { assert!(cfg.connection_timeout.is_none()); assert!(!cfg.output_to_log); assert_eq!(cfg.log_max_line_size, None); + assert_eq!(cfg.stats_cardinality_limit, None); ddog_trace_exporter_config_free(cfg); } @@ -1467,6 +1497,23 @@ mod tests { assert_eq!(ret.unwrap().code, ErrorCode::Panic); } + #[test] + fn config_stats_cardinality_limit_test() { + unsafe { + // Null config → InvalidArgument + let error = ddog_trace_exporter_config_set_stats_cardinality_limit(None, 100); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // Valid config → value stored + let mut config = Some(TraceExporterConfig::default()); + let error = + ddog_trace_exporter_config_set_stats_cardinality_limit(config.as_mut(), 500); + assert_eq!(error, None); + assert_eq!(config.unwrap().stats_cardinality_limit, Some(500)); + } + } + #[test] fn config_health_metrics_test() { unsafe { diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 10649f87ab..abf2cea494 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -325,6 +325,11 @@ impl TelemetryClient { .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await; } + + /// Clone the telemetry handle + pub fn clone_handle(&self) -> TelemetryWorkerHandle { + self.worker.clone() + } } #[cfg(test)] diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 9c2970a475..5582c934cd 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -75,6 +75,7 @@ pub struct TraceExporterBuilder { peer_tags_aggregation: bool, compute_stats_by_span_kind: bool, peer_tags: Vec, + stats_cardinality_limit: Option, #[cfg(feature = "stats-obfuscation")] client_side_stats_obfuscation_enabled: bool, #[cfg(feature = "telemetry")] @@ -142,6 +143,7 @@ impl TraceExporterBuilder { peer_tags_aggregation: false, compute_stats_by_span_kind: false, peer_tags: Vec::new(), + stats_cardinality_limit: None, #[cfg(feature = "stats-obfuscation")] client_side_stats_obfuscation_enabled: false, #[cfg(feature = "telemetry")] @@ -324,6 +326,18 @@ impl TraceExporterBuilder { self } + /// Sets the cardinality limit for client-side stats computation. + /// + /// When the number of distinct stats groups exceeds `limit`, additional groups are + /// aggregated into a sentinel key instead of being tracked individually. + /// This bounds memory usage when the trace population has very high cardinality. + /// + /// Has no effect unless stats computation is enabled. + pub fn set_stats_cardinality_limit(&mut self, cardinality_limit: usize) -> &mut Self { + self.stats_cardinality_limit = Some(cardinality_limit); + self + } + /// Enable client-side stats obfuscation. Disabled by default. /// /// Final activation also requires the agent to advertise a supported @@ -561,10 +575,9 @@ impl TraceExporterBuilder { })?), }; - let dogstatsd = self.dogstatsd_url.and_then(|u| { - new(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return - // None - }); + let dogstatsd = self + .dogstatsd_url + .and_then(|u| new(Endpoint::from_slice(&u)).ok().map(Arc::new)); let base_url = self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL); @@ -807,6 +820,7 @@ impl TraceExporterBuilder { common_stats_tags: vec![libdatadog_version], client_side_stats: StatsComputationConfig { status: ArcSwap::new(stats.into()), + stats_cardinality_limit: self.stats_cardinality_limit, #[cfg(feature = "stats-obfuscation")] obfuscation_config: Arc::new(ArcSwap::from_pointee( StatsComputationObfuscationConfig::default(), diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 52e42dcd67..44ead972e1 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -253,7 +253,7 @@ pub struct TraceExporter< serializer: TraceSerializer, shared_runtime: Arc, /// None if dogstatsd is disabled - dogstatsd: Option, + dogstatsd: Option>, common_stats_tags: Vec, client_computed_top_level: bool, client_side_stats: StatsComputationConfig, @@ -469,6 +469,14 @@ impl< metadata: &self.metadata, endpoint_url: &self.endpoint.url, shared_runtime: &*self.shared_runtime, + stats_cardinality_limit: self.client_side_stats.stats_cardinality_limit, + dogstatsd: if self.health_metrics_enabled { + self.dogstatsd.clone() + } else { + None + }, + #[cfg(feature = "telemetry")] + telemetry: self.telemetry.as_ref().map(|t| t.clone_handle()), }; stats::handle_stats_disabled_by_agent( &ctx, @@ -559,7 +567,7 @@ impl< /// Emit a health metric to dogstatsd fn emit_metric(&self, metric: HealthMetric, custom_tags: Option>) { if self.health_metrics_enabled { - let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags); + let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags); emitter.emit(metric, custom_tags); } } @@ -567,7 +575,7 @@ impl< /// Emit all health metrics from a SendResult fn emit_send_result(&self, result: &SendResult) { if self.health_metrics_enabled { - let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags); + let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags); emitter.emit_from_send_result(result); } } diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 81996006d6..2fdaa29728 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -49,6 +49,12 @@ pub(crate) struct StatsContext<'a, R: SharedRuntime> { pub metadata: &'a TracerMetadata, pub endpoint_url: &'a http::Uri, pub shared_runtime: &'a R, + pub stats_cardinality_limit: Option, + /// Optional DogStatsD client forwarded to the [`StatsExporter`]. + pub dogstatsd: Option>, + /// Optional telemetry handle forwarded to the [`StatsExporter`]. + #[cfg(feature = "telemetry")] + pub telemetry: Option, } #[derive(Debug)] @@ -71,6 +77,7 @@ pub(crate) enum StatsComputationStatus { #[cfg_attr(target_arch = "wasm32", allow(dead_code))] pub(crate) struct StatsComputationConfig { pub(crate) status: ArcSwap, + pub(crate) stats_cardinality_limit: Option, #[cfg(feature = "stats-obfuscation")] pub(crate) obfuscation_config: SharedStatsComputationObfuscationConfig, /// Builder-level opt-in. When false, stats obfuscation stays off @@ -135,7 +142,7 @@ pub(crate) fn start_stats_computation< std::time::SystemTime::now(), span_kinds, peer_tags, - None, + ctx.stats_cardinality_limit, #[cfg(feature = "stats-obfuscation")] Some(client_side_stats.obfuscation_config.clone()), ))); @@ -167,8 +174,8 @@ fn create_and_start_stats_worker< #[cfg(feature = "stats-obfuscation")] SUPPORTED_OBFUSCATION_VERSION_STR, #[cfg(feature = "telemetry")] - None, - None, + ctx.telemetry.clone(), + ctx.dogstatsd.clone(), ); let worker_handle = ctx .shared_runtime