From 00e1e9acb4b7584edd67858fee5dd401291f24a8 Mon Sep 17 00:00:00 2001 From: Brett Langdon Date: Thu, 2 Jul 2026 10:11:47 -0400 Subject: [PATCH] POC encode links and events as JSON --- .../src/trace_exporter/builder.rs | 9 + libdd-data-pipeline/src/trace_exporter/mod.rs | 224 +++++++++++++++++- libdd-trace-utils/src/span/json_encode.rs | 216 +++++++++++++++++ libdd-trace-utils/src/span/mod.rs | 1 + libdd-trace-utils/src/span/v05/mod.rs | 7 + libdd-trace-utils/src/trace_utils.rs | 6 + 6 files changed, 459 insertions(+), 4 deletions(-) create mode 100644 libdd-trace-utils/src/span/json_encode.rs diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 1153a6dbd7..17fa926353 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -88,6 +88,7 @@ pub struct TraceExporterBuilder { otlp_metrics_headers: Vec<(String, String)>, otel_trace_semantics_enabled: bool, runtime_id: Option, + native_span_events: bool, } impl TraceExporterBuilder { @@ -361,6 +362,13 @@ impl TraceExporterBuilder { self } + /// When enabled, span events are emitted as native `span_events` fields (v0.4 only). + /// When disabled (default), span events are JSON-encoded into `meta["events"]`. + pub fn set_native_span_events(&mut self, enabled: bool) -> &mut Self { + self.native_span_events = enabled; + self + } + /// Build the [`TraceExporter`] synchronously. /// /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. @@ -632,6 +640,7 @@ impl TraceExporterBuilder { otlp_config, trace_filterer: ArcSwap::from_pointee(TraceFilterer::with_empty_conf()), otlp_stats_enabled, + native_span_events: self.native_span_events, }) } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 554c88efb8..64a6d8fbb6 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -45,7 +45,7 @@ use libdd_trace_utils::msgpack_decoder; use libdd_trace_utils::send_with_retry::{ send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult, }; -use libdd_trace_utils::span::{v04::Span, TraceData}; +use libdd_trace_utils::span::{v04::Span, SpanText, TraceData}; use libdd_trace_utils::trace_utils::TracerHeaderTags; use std::io; use std::sync::atomic::{AtomicBool, Ordering}; @@ -220,6 +220,87 @@ pub struct TraceExporter( + traces: &mut [Vec>], + output_format: TraceExporterOutputFormat, + native_span_events: bool, +) where + T::Text: From, +{ + use libdd_trace_utils::span::json_encode::{ + span_events_to_json, span_links_to_json, SPAN_EVENTS_KEY, SPAN_LINKS_KEY, + }; + + let encode_links = matches!(output_format, TraceExporterOutputFormat::V05); + let encode_events = matches!( + output_format, + TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V05 + ) && !native_span_events; + + if !encode_links && !encode_events { + return; + } + + // Inserts a JSON string into `meta[key]` and returns true if the caller should clear its vec. + // Logs a debug warning when a pre-existing meta entry is about to be overwritten, and a + // warning when JSON encoding fails (leaving the native vec intact). + fn encode_into_meta( + meta: &mut libdd_trace_utils::span::v04::VecMap, + key: &'static str, + json_result: anyhow::Result, + field_name: &str, + ) -> bool + where + T::Text: From, + { + match json_result { + Ok(json) => { + if tracing::enabled!(tracing::Level::DEBUG) && meta.get(key).is_some() { + debug!( + "Span already has meta[{key:?}] and native {field_name}; \ + overwriting with re-encoded value" + ); + } + meta.insert(T::Text::from_static_str(key), T::Text::from(json)); + true + } + Err(e) => { + warn!("Failed to JSON-encode {field_name}: {e}"); + false + } + } + } + + for chunk in traces.iter_mut() { + for span in chunk.iter_mut() { + if encode_links && !span.span_links.is_empty() { + let json = span_links_to_json(&span.span_links); + if encode_into_meta::(&mut span.meta, SPAN_LINKS_KEY, json, "span_links") { + span.span_links.clear(); + } + } + if encode_events && !span.span_events.is_empty() { + let json = span_events_to_json(&span.span_events); + if encode_into_meta::(&mut span.meta, SPAN_EVENTS_KEY, json, "span_events") { + span.span_events.clear(); + } + } + } + } } impl TraceExporter { @@ -516,7 +597,10 @@ impl Tra &self, trace_chunks: Vec>>, cancellation_token: Option<&CancellationToken>, - ) -> Result { + ) -> Result + where + T::Text: From, + { self.shared_runtime.block_on(async { match cancellation_token { Some(token) => { @@ -543,9 +627,24 @@ impl Tra /// * Err(TraceExporterError): An error detailing what went wrong in the process pub async fn send_trace_chunks_async( &self, - trace_chunks: Vec>>, - ) -> Result { + mut trace_chunks: Vec>>, + ) -> Result + where + T::Text: From, + { self.check_agent_info().await; + // Encode span_links/events to JSON-in-meta for the agent path. + // Skipped for OTLP: the OTLP mapper consumes native span_links/span_events directly. + // Not called from send_async (the msgpack proxy path): the Python side has already + // handled encoding when serializing to msgpack. + if self.otlp_config.is_none() { + let effective_format = self.effective_output_format(); + preprocess_spans_for_encoding( + &mut trace_chunks, + effective_format, + self.native_span_events, + ); + } self.send_trace_chunks_inner(trace_chunks).await } @@ -2116,6 +2215,7 @@ mod single_threaded_tests { use crate::agent_info; use httpmock::prelude::*; use libdd_capabilities_impl::NativeCapabilities; + use libdd_tinybytes::BytesString; use libdd_trace_utils::msgpack_encoder; use libdd_trace_utils::span::v04::SpanBytes; @@ -2507,4 +2607,120 @@ mod single_threaded_tests { ); mock_v04.assert(); } + + // Tests for preprocess_spans_for_encoding + + fn make_span_with_link_and_event() -> SpanBytes { + use libdd_trace_utils::span::v04::{ + AttributeAnyValue, AttributeArrayValue, SpanEvent, SpanLink, + }; + use std::collections::HashMap; + + let mut attrs = HashMap::new(); + attrs.insert(BytesString::from("key"), BytesString::from("val")); + + SpanBytes { + name: BytesString::from("op"), + span_links: vec![SpanLink { + trace_id: 0xaabbccdd, + trace_id_high: 0x11223344, + span_id: 0x55667788, + attributes: attrs, + tracestate: BytesString::from("dd=s:1"), + flags: 1, + }], + span_events: vec![SpanEvent { + time_unix_nano: 1000, + name: BytesString::from("evt"), + attributes: { + let mut m = HashMap::new(); + m.insert( + BytesString::from("k"), + AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(true)), + ); + m + }, + }], + ..Default::default() + } + } + + #[test] + fn test_preprocess_v05_encodes_links_and_events_as_json() { + let mut traces = vec![vec![make_span_with_link_and_event()]]; + preprocess_spans_for_encoding(&mut traces, TraceExporterOutputFormat::V05, false); + let span = &traces[0][0]; + assert!(span.span_links.is_empty(), "span_links should be cleared"); + assert!(span.span_events.is_empty(), "span_events should be cleared"); + + let links_json = span + .meta + .get("_dd.span_links") + .expect("meta[_dd.span_links] missing"); + let events_json = span.meta.get("events").expect("meta[events] missing"); + + let links: Vec = serde_json::from_str(links_json.as_str()).unwrap(); + assert_eq!(links.len(), 1); + assert_eq!(links[0]["trace_id"], "000000001122334400000000aabbccdd"); + assert_eq!(links[0]["span_id"], "0000000055667788"); + assert_eq!(links[0]["flags"], 1); + assert_eq!(links[0]["tracestate"], "dd=s:1"); + assert_eq!(links[0]["attributes"]["key"], "val"); + + let events: Vec = serde_json::from_str(events_json.as_str()).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0]["name"], "evt"); + assert_eq!(events[0]["time_unix_nano"], 1000u64); + assert_eq!(events[0]["attributes"]["k"], true); + } + + #[test] + fn test_preprocess_v04_native_events_off_encodes_events_not_links() { + let mut traces = vec![vec![make_span_with_link_and_event()]]; + preprocess_spans_for_encoding( + &mut traces, + TraceExporterOutputFormat::V04, + false, // native_span_events=false (default) + ); + let span = &traces[0][0]; + // links should stay native for v0.4 + assert!( + !span.span_links.is_empty(), + "span_links should remain native for v0.4" + ); + // events should be converted to JSON + assert!(span.span_events.is_empty(), "span_events should be cleared"); + assert!( + span.meta.get("events").is_some(), + "meta[events] should be set" + ); + // links meta key should NOT be set + assert!( + span.meta.get("_dd.span_links").is_none(), + "meta[_dd.span_links] should not be set for v0.4" + ); + } + + #[test] + fn test_preprocess_v04_native_events_on_no_changes() { + let mut traces = vec![vec![make_span_with_link_and_event()]]; + preprocess_spans_for_encoding( + &mut traces, + TraceExporterOutputFormat::V04, + true, // native_span_events=true + ); + let span = &traces[0][0]; + assert!( + !span.span_links.is_empty(), + "span_links should remain native" + ); + assert!( + !span.span_events.is_empty(), + "span_events should remain native" + ); + assert!( + span.meta.get("events").is_none(), + "meta[events] should not be set" + ); + } } diff --git a/libdd-trace-utils/src/span/json_encode.rs b/libdd-trace-utils/src/span/json_encode.rs new file mode 100644 index 0000000000..d9060ffa01 --- /dev/null +++ b/libdd-trace-utils/src/span/json_encode.rs @@ -0,0 +1,216 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, SpanEvent, SpanLink}; +use crate::span::TraceData; +use serde_json::{Map, Number, Value}; +use std::borrow::Borrow; + +pub const SPAN_LINKS_KEY: &str = "_dd.span_links"; +pub const SPAN_EVENTS_KEY: &str = "events"; + +fn attribute_array_value_to_json(v: &AttributeArrayValue) -> Value { + match v { + AttributeArrayValue::String(s) => Value::String(s.borrow().to_owned()), + AttributeArrayValue::Boolean(b) => Value::Bool(*b), + AttributeArrayValue::Integer(i) => Value::Number(Number::from(*i)), + AttributeArrayValue::Double(d) => Number::from_f64(*d) + .map(Value::Number) + .unwrap_or(Value::Null), + } +} + +fn attribute_any_value_to_json(v: &AttributeAnyValue) -> Value { + match v { + AttributeAnyValue::SingleValue(sv) => attribute_array_value_to_json(sv), + AttributeAnyValue::Array(arr) => { + Value::Array(arr.iter().map(attribute_array_value_to_json).collect()) + } + } +} + +/// JSON-encodes span links into the legacy `meta["_dd.span_links"]` format. +/// +/// Format: `[{"trace_id": "<32-hex>", "span_id": "<16-hex>", "attributes": {...}, ...}]` +pub fn span_links_to_json(links: &[SpanLink]) -> anyhow::Result { + let arr: Vec = links + .iter() + .map(|link| { + let mut obj = Map::new(); + obj.insert( + "trace_id".to_owned(), + Value::String(format!("{:016x}{:016x}", link.trace_id_high, link.trace_id)), + ); + obj.insert( + "span_id".to_owned(), + Value::String(format!("{:016x}", link.span_id)), + ); + if !link.attributes.is_empty() { + let attrs: Map = link + .attributes + .iter() + .map(|(k, v)| (k.borrow().to_owned(), Value::String(v.borrow().to_owned()))) + .collect(); + obj.insert("attributes".to_owned(), Value::Object(attrs)); + } + if !link.tracestate.borrow().is_empty() { + obj.insert( + "tracestate".to_owned(), + Value::String(link.tracestate.borrow().to_owned()), + ); + } + if link.flags != 0 { + obj.insert("flags".to_owned(), Value::Number(Number::from(link.flags))); + } + Value::Object(obj) + }) + .collect(); + Ok(serde_json::to_string(&arr)?) +} + +/// JSON-encodes span events into the legacy `meta["events"]` format. +/// +/// Format: `[{"name": "", "time_unix_nano": , "attributes": {...}}]` +pub fn span_events_to_json(events: &[SpanEvent]) -> anyhow::Result { + let arr: Vec = events + .iter() + .map(|event| { + let mut obj = Map::new(); + obj.insert( + "name".to_owned(), + Value::String(event.name.borrow().to_owned()), + ); + obj.insert( + "time_unix_nano".to_owned(), + Value::Number(Number::from(event.time_unix_nano)), + ); + if !event.attributes.is_empty() { + let attrs: Map = event + .attributes + .iter() + .map(|(k, v)| (k.borrow().to_owned(), attribute_any_value_to_json(v))) + .collect(); + obj.insert("attributes".to_owned(), Value::Object(attrs)); + } + Value::Object(obj) + }) + .collect(); + Ok(serde_json::to_string(&arr)?) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, SpanEvent, SpanLink}; + use crate::span::SliceData; + use std::collections::HashMap; + + #[test] + fn test_span_links_to_json_basic() { + let links: Vec>> = vec![SpanLink { + trace_id: 0x1234567890abcdef, + trace_id_high: 0xfedcba0987654321, + span_id: 0xabcdef1234567890, + attributes: HashMap::new(), + tracestate: "", + flags: 0, + }]; + let json = span_links_to_json(&links).unwrap(); + let parsed: Vec = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.len(), 1); + assert_eq!(parsed[0]["trace_id"], "fedcba09876543211234567890abcdef"); + assert_eq!(parsed[0]["span_id"], "abcdef1234567890"); + assert!(!parsed[0].as_object().unwrap().contains_key("attributes")); + assert!(!parsed[0].as_object().unwrap().contains_key("tracestate")); + assert!(!parsed[0].as_object().unwrap().contains_key("flags")); + } + + #[test] + fn test_span_links_to_json_with_all_fields() { + let mut attrs = HashMap::new(); + attrs.insert("key", "value"); + let links: Vec>> = vec![SpanLink { + trace_id: 1, + trace_id_high: 0, + span_id: 2, + attributes: attrs, + tracestate: "dd=s:1", + flags: 1, + }]; + let json = span_links_to_json(&links).unwrap(); + let parsed: Vec = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed[0]["attributes"]["key"], "value"); + assert_eq!(parsed[0]["tracestate"], "dd=s:1"); + assert_eq!(parsed[0]["flags"], 1); + } + + #[test] + fn test_span_links_to_json_empty() { + let links: Vec>> = vec![]; + let json = span_links_to_json(&links).unwrap(); + assert_eq!(json, "[]"); + } + + #[test] + fn test_span_events_to_json_basic() { + let events: Vec>> = vec![SpanEvent { + time_unix_nano: 1727211691770716000, + name: "exception", + attributes: HashMap::new(), + }]; + let json = span_events_to_json(&events).unwrap(); + let parsed: Vec = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.len(), 1); + assert_eq!(parsed[0]["name"], "exception"); + assert_eq!(parsed[0]["time_unix_nano"], 1727211691770716000u64); + assert!(!parsed[0].as_object().unwrap().contains_key("attributes")); + } + + #[test] + fn test_span_events_to_json_with_attributes() { + let mut attrs = HashMap::new(); + attrs.insert( + "exception.type", + AttributeAnyValue::SingleValue(AttributeArrayValue::String("ValueError")), + ); + attrs.insert( + "exception.escaped", + AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(false)), + ); + attrs.insert( + "exception.count", + AttributeAnyValue::SingleValue(AttributeArrayValue::Integer(3)), + ); + attrs.insert( + "exception.rate", + AttributeAnyValue::SingleValue(AttributeArrayValue::Double(0.5)), + ); + attrs.insert( + "stack.frames", + AttributeAnyValue::Array(vec![ + AttributeArrayValue::String("frame1"), + AttributeArrayValue::String("frame2"), + ]), + ); + let events: Vec>> = vec![SpanEvent { + time_unix_nano: 100, + name: "test", + attributes: attrs, + }]; + let json = span_events_to_json(&events).unwrap(); + let parsed: Vec = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed[0]["attributes"]["exception.type"], "ValueError"); + assert_eq!(parsed[0]["attributes"]["exception.escaped"], false); + assert_eq!(parsed[0]["attributes"]["exception.count"], 3); + assert_eq!(parsed[0]["attributes"]["exception.rate"], 0.5); + let frames = parsed[0]["attributes"]["stack.frames"].as_array().unwrap(); + assert_eq!(frames.len(), 2); + } + + #[test] + fn test_span_events_to_json_empty() { + let events: Vec>> = vec![]; + let json = span_events_to_json(&events).unwrap(); + assert_eq!(json, "[]"); + } +} diff --git a/libdd-trace-utils/src/span/mod.rs b/libdd-trace-utils/src/span/mod.rs index 1a122efe99..cb4c2c8ac0 100644 --- a/libdd-trace-utils/src/span/mod.rs +++ b/libdd-trace-utils/src/span/mod.rs @@ -1,6 +1,7 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +pub mod json_encode; pub mod trace_utils; pub mod v04; pub mod v05; diff --git a/libdd-trace-utils/src/span/v05/mod.rs b/libdd-trace-utils/src/span/v05/mod.rs index 905dc1815a..7030281583 100644 --- a/libdd-trace-utils/src/span/v05/mod.rs +++ b/libdd-trace-utils/src/span/v05/mod.rs @@ -28,6 +28,13 @@ pub struct Span { pub r#type: u32, } +/// Convert a v0.4 span to the interned v0.5 representation. +/// +/// **Precondition**: `span_links` and `span_events` must already be JSON-encoded into +/// `span.meta["_dd.span_links"]` and `span.meta["events"]` respectively before calling +/// this function. The v0.5 `Span` struct has no native fields for these; any non-empty +/// `span_links`/`span_events` vecs are silently dropped. Use +/// `preprocess_spans_for_encoding` (in `libdd-data-pipeline`) before conversion. pub fn from_v04_span( span: crate::span::v04::Span, dict: &mut SharedDict, diff --git a/libdd-trace-utils/src/trace_utils.rs b/libdd-trace-utils/src/trace_utils.rs index 851ac54beb..cd015ffa4b 100644 --- a/libdd-trace-utils/src/trace_utils.rs +++ b/libdd-trace-utils/src/trace_utils.rs @@ -583,6 +583,12 @@ pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) { } } +/// Collect and optionally convert trace chunks to the target encoding. +/// +/// **V05 callers**: `span_links` and `span_events` on each span must be JSON-encoded +/// into `meta["_dd.span_links"]` and `meta["events"]` before calling this function — +/// see `preprocess_spans_for_encoding` in `libdd-data-pipeline`. The v0.5 struct has no +/// native fields for these; non-empty vecs are silently dropped by `from_v04_span`. pub fn collect_trace_chunks( traces: Vec>>, format: TraceEncoding,