Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub struct TraceExporterConfig {
otlp_protocol: Option<OtlpProtocol>,
output_to_log: bool,
log_max_line_size: Option<usize>,
stats_cardinality_limit: Option<usize>,
}

#[no_mangle]
Expand Down Expand Up @@ -543,6 +544,30 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol(
)
}

/// Sets the cardinality limit for client-side stats computation.
///
/// When the number of distinct stats groups exceeds `limit`, additional groups are
/// aggregated into a sentinel key instead of being tracked individually.
/// This bounds memory usage when the trace population has very high cardinality.
///
/// Has no effect unless stats computation is enabled via
/// `ddog_trace_exporter_config_set_compute_stats`.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_stats_cardinality_limit(
config: Option<&mut TraceExporterConfig>,
limit: usize,
) -> Option<Box<ExporterError>> {
catch_panic!(
if let Some(handle) = config {
handle.stats_cardinality_limit = Some(limit);
None
} else {
gen_error!(ErrorCode::InvalidArgument)
},
gen_error!(ErrorCode::Panic)
)
}

/// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog
/// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless
/// environments (e.g. AWS Lambda) when no agent is reachable.
Expand Down Expand Up @@ -620,6 +645,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
.set_output_format(config.output_format)
.set_connection_timeout(config.connection_timeout);

if let Some(limit) = config.stats_cardinality_limit {
builder.set_stats_cardinality_limit(limit);
}

if config.compute_stats {
builder.enable_stats(Duration::from_secs(10));
} else if config.client_computed_stats {
Expand Down Expand Up @@ -753,6 +782,7 @@ mod tests {
assert!(cfg.connection_timeout.is_none());
assert!(!cfg.output_to_log);
assert_eq!(cfg.log_max_line_size, None);
assert_eq!(cfg.stats_cardinality_limit, None);

ddog_trace_exporter_config_free(cfg);
}
Expand Down Expand Up @@ -1467,6 +1497,23 @@ mod tests {
assert_eq!(ret.unwrap().code, ErrorCode::Panic);
}

#[test]
fn config_stats_cardinality_limit_test() {
unsafe {
// Null config → InvalidArgument
let error = ddog_trace_exporter_config_set_stats_cardinality_limit(None, 100);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// Valid config → value stored
let mut config = Some(TraceExporterConfig::default());
let error =
ddog_trace_exporter_config_set_stats_cardinality_limit(config.as_mut(), 500);
assert_eq!(error, None);
assert_eq!(config.unwrap().stats_cardinality_limit, Some(500));
}
}

#[test]
fn config_health_metrics_test() {
unsafe {
Expand Down
5 changes: 5 additions & 0 deletions libdd-data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ impl TelemetryClient {
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await;
}

/// Clone the telemetry handle
pub fn clone_handle(&self) -> TelemetryWorkerHandle {
self.worker.clone()
}
}

#[cfg(test)]
Expand Down
22 changes: 18 additions & 4 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct TraceExporterBuilder<R: SharedRuntime> {
peer_tags_aggregation: bool,
compute_stats_by_span_kind: bool,
peer_tags: Vec<String>,
stats_cardinality_limit: Option<usize>,
#[cfg(feature = "stats-obfuscation")]
client_side_stats_obfuscation_enabled: bool,
#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -142,6 +143,7 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
peer_tags_aggregation: false,
compute_stats_by_span_kind: false,
peer_tags: Vec::new(),
stats_cardinality_limit: None,
#[cfg(feature = "stats-obfuscation")]
client_side_stats_obfuscation_enabled: false,
#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -324,6 +326,18 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
self
}

/// Sets the cardinality limit for client-side stats computation.
///
/// When the number of distinct stats groups exceeds `limit`, additional groups are
/// aggregated into a sentinel key instead of being tracked individually.
/// This bounds memory usage when the trace population has very high cardinality.
///
/// Has no effect unless stats computation is enabled.
pub fn set_stats_cardinality_limit(&mut self, cardinality_limit: usize) -> &mut Self {
self.stats_cardinality_limit = Some(cardinality_limit);
self
}

/// Enable client-side stats obfuscation. Disabled by default.
///
/// Final activation also requires the agent to advertise a supported
Expand Down Expand Up @@ -561,10 +575,9 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
})?),
};

let dogstatsd = self.dogstatsd_url.and_then(|u| {
new(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return
// None
});
let dogstatsd = self
.dogstatsd_url
.and_then(|u| new(Endpoint::from_slice(&u)).ok().map(Arc::new));

let base_url = self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL);

Expand Down Expand Up @@ -807,6 +820,7 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
common_stats_tags: vec![libdatadog_version],
client_side_stats: StatsComputationConfig {
status: ArcSwap::new(stats.into()),
stats_cardinality_limit: self.stats_cardinality_limit,
#[cfg(feature = "stats-obfuscation")]
obfuscation_config: Arc::new(ArcSwap::from_pointee(
StatsComputationObfuscationConfig::default(),
Expand Down
14 changes: 11 additions & 3 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub struct TraceExporter<
serializer: TraceSerializer,
shared_runtime: Arc<R>,
/// None if dogstatsd is disabled
dogstatsd: Option<Client>,
dogstatsd: Option<Arc<Client>>,
common_stats_tags: Vec<Tag>,
client_computed_top_level: bool,
client_side_stats: StatsComputationConfig,
Expand Down Expand Up @@ -469,6 +469,14 @@ impl<
metadata: &self.metadata,
endpoint_url: &self.endpoint.url,
shared_runtime: &*self.shared_runtime,
stats_cardinality_limit: self.client_side_stats.stats_cardinality_limit,
dogstatsd: if self.health_metrics_enabled {
self.dogstatsd.clone()
} else {
None
},
#[cfg(feature = "telemetry")]
telemetry: self.telemetry.as_ref().map(|t| t.clone_handle()),
};
stats::handle_stats_disabled_by_agent(
&ctx,
Expand Down Expand Up @@ -559,15 +567,15 @@ impl<
/// Emit a health metric to dogstatsd
fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
if self.health_metrics_enabled {
let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags);
emitter.emit(metric, custom_tags);
}
}

/// Emit all health metrics from a SendResult
fn emit_send_result(&self, result: &SendResult) {
if self.health_metrics_enabled {
let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags);
emitter.emit_from_send_result(result);
}
}
Expand Down
13 changes: 10 additions & 3 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub(crate) struct StatsContext<'a, R: SharedRuntime> {
pub metadata: &'a TracerMetadata,
pub endpoint_url: &'a http::Uri,
pub shared_runtime: &'a R,
pub stats_cardinality_limit: Option<usize>,
/// Optional DogStatsD client forwarded to the [`StatsExporter`].
pub dogstatsd: Option<std::sync::Arc<libdd_dogstatsd_client::Client>>,
/// Optional telemetry handle forwarded to the [`StatsExporter`].
#[cfg(feature = "telemetry")]
pub telemetry: Option<libdd_telemetry::worker::TelemetryWorkerHandle>,
}

#[derive(Debug)]
Expand All @@ -71,6 +77,7 @@ pub(crate) enum StatsComputationStatus {
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub(crate) struct StatsComputationConfig {
pub(crate) status: ArcSwap<StatsComputationStatus>,
pub(crate) stats_cardinality_limit: Option<usize>,
#[cfg(feature = "stats-obfuscation")]
pub(crate) obfuscation_config: SharedStatsComputationObfuscationConfig,
/// Builder-level opt-in. When false, stats obfuscation stays off
Expand Down Expand Up @@ -135,7 +142,7 @@ pub(crate) fn start_stats_computation<
std::time::SystemTime::now(),
span_kinds,
peer_tags,
None,
ctx.stats_cardinality_limit,
#[cfg(feature = "stats-obfuscation")]
Some(client_side_stats.obfuscation_config.clone()),
)));
Expand Down Expand Up @@ -167,8 +174,8 @@ fn create_and_start_stats_worker<
#[cfg(feature = "stats-obfuscation")]
SUPPORTED_OBFUSCATION_VERSION_STR,
#[cfg(feature = "telemetry")]
None,
None,
ctx.telemetry.clone(),
ctx.dogstatsd.clone(),
);
let worker_handle = ctx
.shared_runtime
Expand Down
Loading