Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog-sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ impl SidecarInterface for ConnectionSidecarHandler {
// Lazily create the concentrator on first IPC span for this (env, version, service).
if let Some(state) = get_or_create_concentrator(
&self.server.span_concentrators,
&self.server.telemetry_clients,
&env,
&version,
session_id,
Expand Down
64 changes: 59 additions & 5 deletions datadog-sidecar/src/service/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! automatically once idle: an empty drain sets the `please_reload` bit (telling PHP workers
//! to stop writing), and the subsequent flush performs a final drain before removal.

use crate::service::{InstanceId, RuntimeMetadata};
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
use datadog_ipc::shm_stats::{
Expand All @@ -18,6 +19,8 @@ use futures::{future::join_all, TryFutureExt};
use http::uri::PathAndQuery;
use libdd_capabilities_impl::{HttpClientCapability, NativeCapabilities};
use libdd_common::{Endpoint, MutexExt};
use libdd_telemetry::config::Config;
use libdd_telemetry::worker::TelemetryWorkerHandle;
use libdd_trace_stats::stats_exporter::{StatsExporter, StatsMetadata};
use std::collections::HashMap;
use std::ffi::CString;
Expand Down Expand Up @@ -104,6 +107,7 @@ fn make_exporter(
s: &SpanConcentratorState,
endpoint: Endpoint,
flush_interval: Duration,
telemetry: Option<TelemetryWorkerHandle>,
) -> StatsExporter<NativeCapabilities, ShmSpanConcentrator> {
StatsExporter::new(
flush_interval,
Expand All @@ -119,7 +123,7 @@ fn make_exporter(
)),
#[cfg(feature = "stats-obfuscation")]
"0",
None,
telemetry,
None,
)
}
Expand All @@ -135,6 +139,7 @@ pub async fn run_stats_flush_loop(
states: Weak<Mutex<HashMap<ConcentratorKey, Arc<SpanConcentratorState>>>>,
map_key: ConcentratorKey,
flush_interval: Duration,
telemetry: Option<TelemetryWorkerHandle>,
) {
let Some(arc) = states.upgrade() else {
return;
Expand All @@ -146,7 +151,13 @@ pub async fn run_stats_flush_loop(
let Some(state) = state else {
return;
};
let exporter = make_exporter(&state, state.endpoint.clone(), flush_interval);

let exporter = make_exporter(
&state,
state.endpoint.clone(),
flush_interval,
Comment on lines +154 to +158

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Refresh stats telemetry after client stops

This binds the flush loop to the telemetry worker that existed when the concentrator was created, but telemetry clients are removed on LifecycleAction::Stop while SHM concentrators are global and can remain active until an idle flush removes them. If another runtime for the same service/env continues using the existing concentrator after a Stop/start cycle, get_or_create_concentrator returns the old state and collapsed-span points keep going to the stopped worker instead of the newly created telemetry client; look up or refresh the worker when flushing rather than capturing it for the concentrator lifetime.

Useful? React with 👍 / 👎.

Comment on lines +154 to +158

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Share one collapsed-spans metric context

When one service/env has multiple active stats concentrators, such as different versions because the concentrator key includes version, each per-concentrator exporter registers a fresh datadog.tracer.stats.collapsed_spans context on the same telemetry worker. The telemetry buckets aggregate by ContextKey, not by metric name/tags, so collapsed-span counts from those exporters are flushed as separate identical series instead of a single count; cache the metric context per worker and share it across exporters.

Useful? React with 👍 / 👎.

telemetry.clone(),
);

loop {
tokio::time::sleep(flush_interval).await;
Expand Down Expand Up @@ -191,7 +202,11 @@ pub async fn run_stats_flush_loop(
guard.remove(&map_key);
}
}
if let Err(e) = exporter.send(true).await {
if let Err(e) =
make_exporter(&state, state.endpoint.clone(), flush_interval, telemetry)
.send(true)
.await
{
warn!("Failed final stats flush: {e}");
}
break;
Expand All @@ -210,6 +225,7 @@ pub async fn run_stats_flush_loop(
/// Returns `None` when stats config is not available (agentless or not yet configured).
pub(crate) fn get_or_create_concentrator(
concentrators: &Arc<Mutex<HashMap<ConcentratorKey, Arc<SpanConcentratorState>>>>,
telemetry_clients: &crate::service::telemetry::TelemetryCachedClientSet,
env: &str,
version: &str,
runtime_id: &str,
Expand Down Expand Up @@ -270,8 +286,46 @@ pub(crate) fn get_or_create_concentrator(
guard.insert(map_key.clone(), state.clone());
let weak = Arc::downgrade(concentrators);
let flush_interval = config.flush_interval;

let trace_config = session.get_trace_config();
let runtime_metadata = RuntimeMetadata::new(
trace_config.language.clone(),
trace_config.language_version.clone(),
trace_config.tracer_version.clone(),
);
drop(trace_config);
Comment on lines +290 to +296

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let trace_config = session.get_trace_config();
let runtime_metadata = RuntimeMetadata::new(
trace_config.language.clone(),
trace_config.language_version.clone(),
trace_config.tracer_version.clone(),
);
drop(trace_config);
let runtime_metadata = {
let trace_config = session.get_trace_config();
RuntimeMetadata::new(
trace_config.language.clone(),
trace_config.language_version.clone(),
trace_config.tracer_version.clone(),
)
};

let process_tags = session.process_tags.lock_or_panic().clone();
let instance_id = InstanceId {
session_id: runtime_id.to_owned(),
runtime_id: runtime_id.to_owned(),
Comment on lines +298 to +300

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use a real runtime id when seeding stats telemetry

When the first IPC fallback span for a service/env arrives before any telemetry client exists, add_span_to_concentrator passes the connection session id as this runtime_id, so this block creates a cached telemetry client whose InstanceId.runtime_id is also the session id. TelemetryCachedClient::new copies that field into builder.runtime_id, and the client is then reused by service/env, so subsequent normal telemetry for the app reports the wrong runtime id. Avoid creating a new telemetry client from this fallback without a real InstanceId, or pass the actual runtime id through the IPC path.

Useful? React with 👍 / 👎.

};
let telemetry = {
let telemetry_mutex = telemetry_clients.get_or_create(
&service_name,
env,
&instance_id,
&runtime_metadata,
|| {session
.session_config
.lock_or_panic()
.as_ref()
.cloned()
.unwrap_or_else(|| {
warn!("Session telemetry config unavailable for env={env} version={version} service={service_name}; telemetry disabled in stats");
Config::default()
})
},
Comment on lines +308 to +317

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|| {session
.session_config
.lock_or_panic()
.as_ref()
.cloned()
.unwrap_or_else(|| {
warn!("Session telemetry config unavailable for env={env} version={version} service={service_name}; telemetry disabled in stats");
Config::default()
})
},
|| {
session
.session_config
.lock_or_panic()
.as_ref()
.cloned()
.unwrap_or_else(|| {
warn!(
"Session telemetry config unavailable for env={env} \
version={version} service={service_name}; telemetry \
disabled in stats"
);
Config::default()
})
},

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why cargo fmt didn't format this part and according to claude:

when call args contain complex closures and rustfmt can't find valid layout within width, it falls back to leaving the expression untouched rather than producing malformed output.

with that format, cargo fmt works again.

(btw I checked and escaped newlines in strings are also un-indented so the log still looks like before)

process_tags,
);
let worker = telemetry_mutex
.lock_or_panic()
.as_ref()
.map(|c| c.worker.clone());
worker
};

tokio::spawn(async move {
run_stats_flush_loop(weak, map_key, flush_interval).await;
run_stats_flush_loop(weak, map_key, flush_interval, telemetry).await;
});
Some(state)
}
Expand All @@ -292,7 +346,7 @@ pub async fn flush_all_stats_now(
.values()
.map(|s| {
(
make_exporter(s, s.endpoint.clone(), Duration::from_secs(10)),
make_exporter(s, s.endpoint.clone(), Duration::from_secs(10), None),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Pass telemetry through explicit stats flushes

When ddog_sidecar_flush drains stats via flush(... traces_and_stats: true ...) before the periodic loop runs, this exporter is built with None, so StatsExporter::send cannot emit the datadog.tracer.stats.collapsed_spans telemetry for collapsed spans in that flush. The buckets are drained by this send, so the periodic exporter that does have a worker handle never gets another chance to report them; the flush path should use the concentrator's telemetry worker as well.

Useful? React with 👍 / 👎.

s.endpoint.clone(),
)
})
Expand Down
47 changes: 47 additions & 0 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub struct TraceExporterConfig {
otlp_protocol: Option<OtlpProtocol>,
output_to_log: bool,
log_max_line_size: Option<usize>,
stats_cardinality_limit: Option<usize>,
}

#[no_mangle]
Expand Down Expand Up @@ -543,6 +544,30 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol(
)
}

/// Sets the cardinality limit for client-side stats computation.
///
/// When the number of distinct stats groups exceeds `limit`, additional groups are
/// aggregated into a sentinel key instead of being tracked individually.
/// This bounds memory usage when the trace population has very high cardinality.
///
/// Has no effect unless stats computation is enabled via
/// `ddog_trace_exporter_config_set_compute_stats`.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_stats_cardinality_limit(
config: Option<&mut TraceExporterConfig>,
limit: usize,
) -> Option<Box<ExporterError>> {
catch_panic!(
if let Some(handle) = config {
handle.stats_cardinality_limit = Some(limit);
None
} else {
gen_error!(ErrorCode::InvalidArgument)
},
gen_error!(ErrorCode::Panic)
)
}

/// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog
/// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless
/// environments (e.g. AWS Lambda) when no agent is reachable.
Expand Down Expand Up @@ -620,6 +645,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
.set_output_format(config.output_format)
.set_connection_timeout(config.connection_timeout);

if let Some(limit) = config.stats_cardinality_limit {
builder.set_stats_cardinality_limit(limit);
}

if config.compute_stats {
builder.enable_stats(Duration::from_secs(10));
} else if config.client_computed_stats {
Expand Down Expand Up @@ -753,6 +782,7 @@ mod tests {
assert!(cfg.connection_timeout.is_none());
assert!(!cfg.output_to_log);
assert_eq!(cfg.log_max_line_size, None);
assert_eq!(cfg.stats_cardinality_limit, None);

ddog_trace_exporter_config_free(cfg);
}
Expand Down Expand Up @@ -1467,6 +1497,23 @@ mod tests {
assert_eq!(ret.unwrap().code, ErrorCode::Panic);
}

#[test]
fn config_stats_cardinality_limit_test() {
unsafe {
// Null config → InvalidArgument
let error = ddog_trace_exporter_config_set_stats_cardinality_limit(None, 100);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// Valid config → value stored
let mut config = Some(TraceExporterConfig::default());
let error =
ddog_trace_exporter_config_set_stats_cardinality_limit(config.as_mut(), 500);
assert_eq!(error, None);
assert_eq!(config.unwrap().stats_cardinality_limit, Some(500));
}
}

#[test]
fn config_health_metrics_test() {
unsafe {
Expand Down
5 changes: 5 additions & 0 deletions libdd-data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ impl TelemetryClient {
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await;
}

/// Clone the telemetry handle
pub fn clone_handle(&self) -> TelemetryWorkerHandle {
self.worker.clone()
}
}

#[cfg(test)]
Expand Down
22 changes: 18 additions & 4 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct TraceExporterBuilder<R: SharedRuntime> {
peer_tags_aggregation: bool,
compute_stats_by_span_kind: bool,
peer_tags: Vec<String>,
stats_cardinality_limit: Option<usize>,
#[cfg(feature = "stats-obfuscation")]
client_side_stats_obfuscation_enabled: bool,
#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -142,6 +143,7 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
peer_tags_aggregation: false,
compute_stats_by_span_kind: false,
peer_tags: Vec::new(),
stats_cardinality_limit: None,
#[cfg(feature = "stats-obfuscation")]
client_side_stats_obfuscation_enabled: false,
#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -324,6 +326,18 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
self
}

/// Sets the cardinality limit for client-side stats computation.
///
/// When the number of distinct stats groups exceeds `limit`, additional groups are
/// aggregated into a sentinel key instead of being tracked individually.
/// This bounds memory usage when the trace population has very high cardinality.
///
/// Has no effect unless stats computation is enabled.
pub fn set_stats_cardinality_limit(&mut self, cardinality_limit: usize) -> &mut Self {
self.stats_cardinality_limit = Some(cardinality_limit);
self
}

/// Enable client-side stats obfuscation. Disabled by default.
///
/// Final activation also requires the agent to advertise a supported
Expand Down Expand Up @@ -561,10 +575,9 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
})?),
};

let dogstatsd = self.dogstatsd_url.and_then(|u| {
new(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return
// None
});
let dogstatsd = self
.dogstatsd_url
.and_then(|u| new(Endpoint::from_slice(&u)).ok().map(Arc::new));

let base_url = self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL);

Expand Down Expand Up @@ -807,6 +820,7 @@ impl<R: SharedRuntime> TraceExporterBuilder<R> {
common_stats_tags: vec![libdatadog_version],
client_side_stats: StatsComputationConfig {
status: ArcSwap::new(stats.into()),
stats_cardinality_limit: self.stats_cardinality_limit,
#[cfg(feature = "stats-obfuscation")]
obfuscation_config: Arc::new(ArcSwap::from_pointee(
StatsComputationObfuscationConfig::default(),
Expand Down
14 changes: 11 additions & 3 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub struct TraceExporter<
serializer: TraceSerializer,
shared_runtime: Arc<R>,
/// None if dogstatsd is disabled
dogstatsd: Option<Client>,
dogstatsd: Option<Arc<Client>>,
common_stats_tags: Vec<Tag>,
client_computed_top_level: bool,
client_side_stats: StatsComputationConfig,
Expand Down Expand Up @@ -469,6 +469,14 @@ impl<
metadata: &self.metadata,
endpoint_url: &self.endpoint.url,
shared_runtime: &*self.shared_runtime,
stats_cardinality_limit: self.client_side_stats.stats_cardinality_limit,
dogstatsd: if self.health_metrics_enabled {
self.dogstatsd.clone()
} else {
None
},
#[cfg(feature = "telemetry")]
telemetry: self.telemetry.as_ref().map(|t| t.clone_handle()),
};
stats::handle_stats_disabled_by_agent(
&ctx,
Expand Down Expand Up @@ -559,15 +567,15 @@ impl<
/// Emit a health metric to dogstatsd
fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
if self.health_metrics_enabled {
let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags);
emitter.emit(metric, custom_tags);
}
}

/// Emit all health metrics from a SendResult
fn emit_send_result(&self, result: &SendResult) {
if self.health_metrics_enabled {
let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
let emitter = MetricsEmitter::new(self.dogstatsd.as_deref(), &self.common_stats_tags);
emitter.emit_from_send_result(result);
}
}
Expand Down
13 changes: 10 additions & 3 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub(crate) struct StatsContext<'a, R: SharedRuntime> {
pub metadata: &'a TracerMetadata,
pub endpoint_url: &'a http::Uri,
pub shared_runtime: &'a R,
pub stats_cardinality_limit: Option<usize>,
/// Optional DogStatsD client forwarded to the [`StatsExporter`].
pub dogstatsd: Option<std::sync::Arc<libdd_dogstatsd_client::Client>>,
/// Optional telemetry handle forwarded to the [`StatsExporter`].
#[cfg(feature = "telemetry")]
pub telemetry: Option<libdd_telemetry::worker::TelemetryWorkerHandle>,
}

#[derive(Debug)]
Expand All @@ -71,6 +77,7 @@ pub(crate) enum StatsComputationStatus {
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub(crate) struct StatsComputationConfig {
pub(crate) status: ArcSwap<StatsComputationStatus>,
pub(crate) stats_cardinality_limit: Option<usize>,
#[cfg(feature = "stats-obfuscation")]
pub(crate) obfuscation_config: SharedStatsComputationObfuscationConfig,
/// Builder-level opt-in. When false, stats obfuscation stays off
Expand Down Expand Up @@ -135,7 +142,7 @@ pub(crate) fn start_stats_computation<
std::time::SystemTime::now(),
span_kinds,
peer_tags,
None,
ctx.stats_cardinality_limit,
#[cfg(feature = "stats-obfuscation")]
Some(client_side_stats.obfuscation_config.clone()),
)));
Expand Down Expand Up @@ -167,8 +174,8 @@ fn create_and_start_stats_worker<
#[cfg(feature = "stats-obfuscation")]
SUPPORTED_OBFUSCATION_VERSION_STR,
#[cfg(feature = "telemetry")]
None,
None,
ctx.telemetry.clone(),
ctx.dogstatsd.clone(),
);
let worker_handle = ctx
.shared_runtime
Expand Down
Loading