Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub struct TraceExporterBuilder {
otlp_metrics_headers: Vec<(String, String)>,
otel_trace_semantics_enabled: bool,
runtime_id: Option<String>,
native_span_events: bool,
}

impl TraceExporterBuilder {
Expand Down Expand Up @@ -361,6 +362,13 @@ impl TraceExporterBuilder {
self
}

/// When enabled, span events are emitted as native `span_events` fields (v0.4 only).
/// When disabled (default), span events are JSON-encoded into `meta["events"]`.
pub fn set_native_span_events(&mut self, enabled: bool) -> &mut Self {
self.native_span_events = enabled;
self
}

/// Build the [`TraceExporter`] synchronously.
///
/// Sync facade over [`Self::build_async`]; panics inside an existing tokio context.
Expand Down Expand Up @@ -632,6 +640,7 @@ impl TraceExporterBuilder {
otlp_config,
trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()),
otlp_stats_enabled,
native_span_events: self.native_span_events,
})
}

Expand Down
224 changes: 220 additions & 4 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use libdd_trace_utils::msgpack_decoder;
use libdd_trace_utils::send_with_retry::{
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
};
use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::span::{v04::Span, SpanText, TraceData};
use libdd_trace_utils::trace_utils::TracerHeaderTags;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -220,6 +220,87 @@ pub struct TraceExporter<C: HttpClientCapability + SleepCapability + MaybeSend +
/// skipped.
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
otlp_stats_enabled: bool,
/// When false (default), span events are JSON-encoded into `meta["events"]`.
/// When true, span events are emitted as native `span_events` fields (v0.4 only).
native_span_events: bool,
}

/// Converts span links and events to legacy JSON-in-meta format before serialization.
///
/// For v0.5 output: encodes span_links → `meta["_dd.span_links"]` and
/// span_events → `meta["events"]`.
/// For v0.4/v1 output with native events disabled: encodes span_events → `meta["events"]`.
/// Clears the native vecs after encoding so encoders don't emit duplicate data.
///
/// Called only from the native-span path (`send_trace_chunks_async`). The msgpack proxy
/// path (`send_async`) skips this step because the Python side has already handled encoding.
fn preprocess_spans_for_encoding<T: TraceData>(
traces: &mut [Vec<Span<T>>],
output_format: TraceExporterOutputFormat,
native_span_events: bool,
) where
T::Text: From<String>,
{
use libdd_trace_utils::span::json_encode::{
span_events_to_json, span_links_to_json, SPAN_EVENTS_KEY, SPAN_LINKS_KEY,
};

let encode_links = matches!(output_format, TraceExporterOutputFormat::V05);
let encode_events = matches!(
output_format,
TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V05
) && !native_span_events;

if !encode_links && !encode_events {
return;
}

// Inserts a JSON string into `meta[key]` and returns true if the caller should clear its vec.
// Logs a debug warning when a pre-existing meta entry is about to be overwritten, and a
// warning when JSON encoding fails (leaving the native vec intact).
fn encode_into_meta<T: TraceData>(
meta: &mut libdd_trace_utils::span::v04::VecMap<T::Text, T::Text>,
key: &'static str,
json_result: anyhow::Result<String>,
field_name: &str,
) -> bool
where
T::Text: From<String>,
{
match json_result {
Ok(json) => {
if tracing::enabled!(tracing::Level::DEBUG) && meta.get(key).is_some() {
debug!(
"Span already has meta[{key:?}] and native {field_name}; \
overwriting with re-encoded value"
);
}
meta.insert(T::Text::from_static_str(key), T::Text::from(json));
true
}
Err(e) => {
warn!("Failed to JSON-encode {field_name}: {e}");
false
}
}
}

for chunk in traces.iter_mut() {
for span in chunk.iter_mut() {
if encode_links && !span.span_links.is_empty() {
let json = span_links_to_json(&span.span_links);
if encode_into_meta::<T>(&mut span.meta, SPAN_LINKS_KEY, json, "span_links") {
span.span_links.clear();
}
}
if encode_events && !span.span_events.is_empty() {
let json = span_events_to_json(&span.span_events);
if encode_into_meta::<T>(&mut span.meta, SPAN_EVENTS_KEY, json, "span_events") {
span.span_events.clear();
}
}
}
}
}

impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> TraceExporter<C> {
Expand Down Expand Up @@ -516,7 +597,10 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
&self,
trace_chunks: Vec<Vec<Span<T>>>,
cancellation_token: Option<&CancellationToken>,
) -> Result<AgentResponse, TraceExporterError> {
) -> Result<AgentResponse, TraceExporterError>
where
T::Text: From<String>,
{
self.shared_runtime.block_on(async {
match cancellation_token {
Some(token) => {
Expand All @@ -543,9 +627,24 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
/// * Err(TraceExporterError): An error detailing what went wrong in the process
pub async fn send_trace_chunks_async<T: TraceData>(
&self,
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
mut trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError>
where
T::Text: From<String>,
{
self.check_agent_info().await;
// Encode span_links/events to JSON-in-meta for the agent path.
// Skipped for OTLP: the OTLP mapper consumes native span_links/span_events directly.
// Not called from send_async (the msgpack proxy path): the Python side has already
// handled encoding when serializing to msgpack.
if self.otlp_config.is_none() {
let effective_format = self.effective_output_format();
preprocess_spans_for_encoding(
&mut trace_chunks,
effective_format,
self.native_span_events,
);
}
self.send_trace_chunks_inner(trace_chunks).await
}

Expand Down Expand Up @@ -2116,6 +2215,7 @@ mod single_threaded_tests {
use crate::agent_info;
use httpmock::prelude::*;
use libdd_capabilities_impl::NativeCapabilities;
use libdd_tinybytes::BytesString;
use libdd_trace_utils::msgpack_encoder;
use libdd_trace_utils::span::v04::SpanBytes;

Expand Down Expand Up @@ -2507,4 +2607,120 @@ mod single_threaded_tests {
);
mock_v04.assert();
}

// Tests for preprocess_spans_for_encoding

fn make_span_with_link_and_event() -> SpanBytes {
use libdd_trace_utils::span::v04::{
AttributeAnyValue, AttributeArrayValue, SpanEvent, SpanLink,
};
use std::collections::HashMap;

let mut attrs = HashMap::new();
attrs.insert(BytesString::from("key"), BytesString::from("val"));

SpanBytes {
name: BytesString::from("op"),
span_links: vec![SpanLink {
trace_id: 0xaabbccdd,
trace_id_high: 0x11223344,
span_id: 0x55667788,
attributes: attrs,
tracestate: BytesString::from("dd=s:1"),
flags: 1,
}],
span_events: vec![SpanEvent {
time_unix_nano: 1000,
name: BytesString::from("evt"),
attributes: {
let mut m = HashMap::new();
m.insert(
BytesString::from("k"),
AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(true)),
);
m
},
}],
..Default::default()
}
}

#[test]
fn test_preprocess_v05_encodes_links_and_events_as_json() {
let mut traces = vec![vec![make_span_with_link_and_event()]];
preprocess_spans_for_encoding(&mut traces, TraceExporterOutputFormat::V05, false);
let span = &traces[0][0];
assert!(span.span_links.is_empty(), "span_links should be cleared");
assert!(span.span_events.is_empty(), "span_events should be cleared");

let links_json = span
.meta
.get("_dd.span_links")
.expect("meta[_dd.span_links] missing");
let events_json = span.meta.get("events").expect("meta[events] missing");

let links: Vec<serde_json::Value> = serde_json::from_str(links_json.as_str()).unwrap();
assert_eq!(links.len(), 1);
assert_eq!(links[0]["trace_id"], "000000001122334400000000aabbccdd");
assert_eq!(links[0]["span_id"], "0000000055667788");
assert_eq!(links[0]["flags"], 1);
assert_eq!(links[0]["tracestate"], "dd=s:1");
assert_eq!(links[0]["attributes"]["key"], "val");

let events: Vec<serde_json::Value> = serde_json::from_str(events_json.as_str()).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0]["name"], "evt");
assert_eq!(events[0]["time_unix_nano"], 1000u64);
assert_eq!(events[0]["attributes"]["k"], true);
}

#[test]
fn test_preprocess_v04_native_events_off_encodes_events_not_links() {
let mut traces = vec![vec![make_span_with_link_and_event()]];
preprocess_spans_for_encoding(
&mut traces,
TraceExporterOutputFormat::V04,
false, // native_span_events=false (default)
);
let span = &traces[0][0];
// links should stay native for v0.4
assert!(
!span.span_links.is_empty(),
"span_links should remain native for v0.4"
);
// events should be converted to JSON
assert!(span.span_events.is_empty(), "span_events should be cleared");
assert!(
span.meta.get("events").is_some(),
"meta[events] should be set"
);
// links meta key should NOT be set
assert!(
span.meta.get("_dd.span_links").is_none(),
"meta[_dd.span_links] should not be set for v0.4"
);
}

#[test]
fn test_preprocess_v04_native_events_on_no_changes() {
let mut traces = vec![vec![make_span_with_link_and_event()]];
preprocess_spans_for_encoding(
&mut traces,
TraceExporterOutputFormat::V04,
true, // native_span_events=true
);
let span = &traces[0][0];
assert!(
!span.span_links.is_empty(),
"span_links should remain native"
);
assert!(
!span.span_events.is_empty(),
"span_events should remain native"
);
assert!(
span.meta.get("events").is_none(),
"meta[events] should not be set"
);
}
}
Loading