diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 7cfd62a5a0..e60eef5f10 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -17,7 +17,7 @@ use libdd_trace_utils::msgpack_encoder; use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; use libdd_trace_utils::tracer_metadata::TracerMetadata; -use libdd_trace_utils::tracer_payload::{self, TraceEncoding}; +use libdd_trace_utils::tracer_payload::{self}; /// Minimal capacity of fresh buffers allocated to encode traces, in bytes. const MIN_BUFFER_CAPACITY: usize = 1024; @@ -59,7 +59,7 @@ impl TraceSerializer { let chunks = payload.size(); let headers = self.build_traces_headers(header_tags, chunks, agent_payload_response_version); - let mp_payload = self.serialize_payload(&payload, metadata)?; + let mp_payload = self.serialize_payload(&payload, metadata, output_format)?; Ok(PreparedTracesPayload { data: mp_payload, @@ -78,12 +78,19 @@ impl TraceSerializer { TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) }; match output_format { - TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)), - TraceExporterOutputFormat::V04 => { - trace_utils::collect_trace_chunks(traces, TraceEncoding::V04).map_err(map_err) + // v0.4 input spans are kept as-is in `TraceChunks::V04`. Whether they go out as v0.4 + // or are cross-encoded into V1 on the wire is decided in `serialize_payload`. + // + // APMSP-2812 - TODO: when the data-pipeline gains a V1-native input model (its own + // `v1::Span`-shaped builder), route `OutputFormat::V1` to + // `TraceChunks::V1(v1::TracerPayload)` instead and serialize via + // `to_vec_from_payload_v1`. A `StatSpan` impl on `v1::Span` will also be needed + // if client-side stats are enabled on the V1-native path. + TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V1 => { + Ok(tracer_payload::TraceChunks::V04(traces)) } TraceExporterOutputFormat::V05 => { - trace_utils::collect_trace_chunks(traces, TraceEncoding::V05).map_err(map_err) + trace_utils::convert_trace_chunks_v04_to_v05(traces).map_err(map_err) } } } @@ -113,23 +120,44 @@ impl TraceSerializer { &self, payload: &tracer_payload::TraceChunks, metadata: &TracerMetadata, + output_format: TraceExporterOutputFormat, ) -> Result, TraceExporterError> { let capacity = self .previous_serialised_len .load(Ordering::Relaxed) .max(MIN_BUFFER_CAPACITY); - let buff = match payload { - tracer_payload::TraceChunks::V04(p) => { + let buff = match (payload, output_format) { + (tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V04) => { msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32) } - tracer_payload::TraceChunks::V05(p) => { + // v0.4 spans cross-encoded as V1 on the wire (used when the agent advertises + // /v1.0/traces). + (tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V1) => { + msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata) + } + (tracer_payload::TraceChunks::V05(p), TraceExporterOutputFormat::V05) => { let mut buff = Vec::with_capacity(capacity); rmp_serde::encode::write(&mut buff, p) .map_err(TraceExporterError::Serialization)?; buff } - tracer_payload::TraceChunks::V1(p) => { - msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata) + // APMSP-2812 - TODO: native V1 input — call + // `msgpack_encoder::v1::to_vec_from_payload_v1` on the carried + // `v1::TracerPayload`. Not yet reachable: `collect_and_process_traces` + // never produces `TraceChunks::V1` in the current data-pipeline path. + #[allow(clippy::unimplemented)] + (tracer_payload::TraceChunks::V1(_), TraceExporterOutputFormat::V1) => { + unimplemented!("Native V1 input serialization not yet implemented (APMSP-2812)") + } + // `collect_and_process_traces` only produces (V04, V04|V1), (V05, V05), + // or (V1, V1) — any other combination here is a programming error. + _ => { + return Err(TraceExporterError::Deserialization( + DecodeError::InvalidFormat( + "Unsupported (TraceChunks, OutputFormat) combination for serialization" + .to_owned(), + ), + )); } }; self.previous_serialised_len @@ -277,7 +305,11 @@ mod tests { .collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V04) .unwrap(); - let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); + let result = serializer.serialize_payload( + &payload, + &TracerMetadata::default(), + TraceExporterOutputFormat::V04, + ); assert!(result.is_ok()); let serialized = result.unwrap(); @@ -312,7 +344,11 @@ mod tests { .collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V05) .unwrap(); - let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); + let result = serializer.serialize_payload( + &payload, + &TracerMetadata::default(), + TraceExporterOutputFormat::V05, + ); assert!(result.is_ok()); let serialized = result.unwrap(); diff --git a/libdd-trace-utils/src/msgpack_decoder/mod.rs b/libdd-trace-utils/src/msgpack_decoder/mod.rs index 6a07361389..d22f2fd788 100644 --- a/libdd-trace-utils/src/msgpack_decoder/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/mod.rs @@ -4,3 +4,4 @@ pub mod decode; pub mod v04; pub mod v05; +pub mod v1; diff --git a/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs new file mode 100644 index 0000000000..a91568bd08 --- /dev/null +++ b/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs @@ -0,0 +1,658 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub(super) mod span; + +use crate::msgpack_decoder::decode::buffer::Buffer; +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::v1::{TraceChunk, TracerPayload, TracerPayloadBytes, TracerPayloadSlice}; +use crate::span::DeserializableTraceData; +use rmp::decode; +use std::borrow::Borrow; + +// Integer keys used by the V1 wire format. Kept in sync with the encoder side +// (`msgpack_encoder::v1::{trace_key, chunk_key, SpanKey, SpanLinkKey, SpanEventKey, AnyValueKey}`). + +pub(super) mod trace_key { + pub const LANGUAGE_NAME: u8 = 3; + pub const LANGUAGE_VERSION: u8 = 4; + pub const TRACER_VERSION: u8 = 5; + pub const RUNTIME_ID: u8 = 6; + pub const ENV_REF: u8 = 7; + pub const HOSTNAME_REF: u8 = 8; + pub const APP_VERSION_REF: u8 = 9; + pub const ATTRIBUTES: u8 = 10; + pub const CHUNKS: u8 = 11; +} + +pub(super) mod chunk_key { + pub const PRIORITY: u8 = 1; + pub const ORIGIN: u8 = 2; + pub const ATTRIBUTES: u8 = 3; + pub const SPANS: u8 = 4; + pub const DROPPED_TRACE: u8 = 5; + pub const TRACE_ID: u8 = 6; + pub const SAMPLING_MECHANISM: u8 = 7; +} + +pub(super) mod span_key { + pub const SERVICE: u8 = 1; + pub const NAME: u8 = 2; + pub const RESOURCE: u8 = 3; + pub const SPAN_ID: u8 = 4; + pub const PARENT_ID: u8 = 5; + pub const START: u8 = 6; + pub const DURATION: u8 = 7; + pub const ERROR: u8 = 8; + pub const ATTRIBUTES: u8 = 9; + pub const TYPE: u8 = 10; + pub const SPAN_LINKS: u8 = 11; + pub const SPAN_EVENTS: u8 = 12; + pub const ENV: u8 = 13; + pub const VERSION: u8 = 14; + pub const COMPONENT: u8 = 15; + pub const KIND: u8 = 16; +} + +pub(super) mod span_link_key { + pub const TRACE_ID: u8 = 1; + pub const SPAN_ID: u8 = 2; + pub const ATTRIBUTES: u8 = 3; + pub const TRACE_STATE: u8 = 4; + pub const FLAGS: u8 = 5; +} + +pub(super) mod span_event_key { + pub const TIME: u8 = 1; + pub const NAME: u8 = 2; + pub const ATTRIBUTES: u8 = 3; +} + +pub(super) const ANY_VALUE_KEY_STRING: u8 = 1; +pub(super) const ANY_VALUE_KEY_BOOL: u8 = 2; +pub(super) const ANY_VALUE_KEY_DOUBLE: u8 = 3; +pub(super) const ANY_VALUE_KEY_INT64: u8 = 4; +pub(super) const ANY_VALUE_KEY_BYTES: u8 = 5; +pub(super) const ANY_VALUE_KEY_ARRAY: u8 = 6; +pub(super) const ANY_VALUE_KEY_KEY_VALUE_LIST: u8 = 7; + +/// Number of msgpack items consumed per `[type, value]` pair in a typed `Array`. +pub(super) const TYPED_VALUE_STRIDE: u32 = 2; + +/// Number of msgpack items consumed per `[key, type, value]` triplet in a typed attributes map. +pub(super) const FLAT_ATTR_STRIDE: u32 = 3; + +/// Streaming string intern table built up as the payload is decoded. +/// +/// V1 strings are encoded inline the first time they appear (as msgpack `str`), and as a +/// msgpack `uint` reference on every subsequent occurrence. ID 0 is reserved for the empty +/// string and is pre-inserted on construction. +pub(super) struct StringTable +where + T::Text: Clone, +{ + seen: Vec, +} + +impl StringTable +where + T::Text: Clone, +{ + pub(super) fn new() -> Self { + Self { + seen: vec![T::Text::default()], + } + } + + /// Resolves a string reference by ID (encoded inline as msgpack `uint`). + fn resolve(&self, id: u64) -> Result { + usize::try_from(id) + .ok() + .and_then(|i| self.seen.get(i).cloned()) + .ok_or_else(|| { + DecodeError::InvalidFormat(format!( + "V1 string table reference out of range: id={id}, table_len={}", + self.seen.len() + )) + }) + } + + /// Records a freshly-read inline string and returns it (cloned for reuse). + fn record(&mut self, s: T::Text) -> T::Text { + self.seen.push(s.clone()); + s + } +} + +/// Reads a string-or-reference value at the current buffer position. +/// +/// Decides based on the next msgpack marker: +/// - `str`/`fixstr` → read and intern, return the value +/// - any unsigned int marker → resolve the table reference +pub(super) fn read_interned_string( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result +where + T::Text: Clone, +{ + let slice: &[u8] = buf.as_mut_slice(); + let marker_byte = *slice.first().ok_or_else(|| { + DecodeError::InvalidFormat( + "Unexpected end of V1 buffer when reading interned string".to_owned(), + ) + })?; + + // msgpack markers: + // fixstr : 0xa0..=0xbf + // str8/str16/str32 : 0xd9, 0xda, 0xdb + // fixint (positive): 0x00..=0x7f + // uint8/16/32/64 : 0xcc, 0xcd, 0xce, 0xcf + let is_string = matches!(marker_byte, 0xa0..=0xbf | 0xd9 | 0xda | 0xdb); + let is_uint = matches!(marker_byte, 0x00..=0x7f | 0xcc | 0xcd | 0xce | 0xcf); + + if is_string { + let s = buf.read_string()?; + Ok(table.record(s)) + } else if is_uint { + let id: u64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 interned string reference uint read failure".to_owned()) + })?; + table.resolve(id) + } else { + Err(DecodeError::InvalidFormat(format!( + "Unexpected msgpack marker 0x{marker_byte:02x} for V1 interned string" + ))) + } +} + +/// Decodes a V1 msgpack payload from owned bytes into a [`TracerPayloadBytes`]. +/// +/// # Returns +/// +/// * `Ok((payload, payload_size))` — the decoded payload and the number of bytes consumed from the +/// buffer. +/// * `Err(DecodeError)` — if the payload is malformed. +/// +/// # Errors +/// +/// Returns an error for any malformed map / array length, unknown map key, missing required +/// field, or any embedded msgpack read failure. +pub fn from_bytes( + data: libdd_tinybytes::Bytes, +) -> Result<(TracerPayloadBytes, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +/// Decodes a V1 msgpack payload from a borrowed slice into a [`TracerPayloadSlice`]. +/// The resulting payload borrows from the input buffer (same lifetime). +pub fn from_slice(data: &[u8]) -> Result<(TracerPayloadSlice<'_>, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +/// Generic over the deserialization mode (owned `BytesData` or borrowed `SliceData`). +#[allow(clippy::type_complexity)] +pub fn from_buffer( + data: &mut Buffer, +) -> Result<(TracerPayload, usize), DecodeError> +where + T::Text: Clone, +{ + let start_len = data.len(); + let mut table = StringTable::::new(); + let payload = decode_payload(data, &mut table)?; + let consumed = start_len - data.len(); + Ok((payload, consumed)) +} + +/// Decodes the top-level V1 payload map: tracer metadata fields + chunks array. +fn decode_payload( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("Unable to read V1 payload map len".to_owned()))?; + + let mut payload = TracerPayload::::default(); + let mut saw_chunks = false; + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 payload key (u8) read failure".to_owned()) + })?; + match key { + trace_key::CHUNKS => { + payload.chunks = decode_chunks(buf, table)?; + saw_chunks = true; + } + trace_key::LANGUAGE_NAME => payload.language_name = read_interned_string(buf, table)?, + trace_key::LANGUAGE_VERSION => { + payload.language_version = read_interned_string(buf, table)? + } + trace_key::TRACER_VERSION => payload.tracer_version = read_interned_string(buf, table)?, + trace_key::RUNTIME_ID => payload.runtime_id = read_interned_string(buf, table)?, + trace_key::ENV_REF => payload.env = read_interned_string(buf, table)?, + trace_key::HOSTNAME_REF => payload.hostname = read_interned_string(buf, table)?, + trace_key::APP_VERSION_REF => payload.app_version = read_interned_string(buf, table)?, + trace_key::ATTRIBUTES => { + payload.attributes = span::read_attributes_map(buf, table)?; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 payload key: {unknown}" + ))); + } + } + } + + if !saw_chunks { + return Err(DecodeError::InvalidFormat( + "V1 payload is missing the chunks field".to_owned(), + )); + } + + Ok(payload) +} + +fn decode_chunks( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let count = decode::read_array_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 chunks array len read failure".to_owned()))?; + let mut chunks = Vec::with_capacity(count as usize); + for _ in 0..count { + chunks.push(decode_chunk(buf, table)?); + } + Ok(chunks) +} + +fn decode_chunk( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 chunk map len read failure".to_owned()))?; + let mut chunk = TraceChunk::::default(); + let mut saw_trace_id = false; + let mut saw_spans = false; + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 chunk key (u8) read failure".to_owned()))?; + match key { + chunk_key::TRACE_ID => { + let len = decode::read_bin_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 chunk trace_id bin len read failure".to_owned()) + })?; + if len != 16 { + return Err(DecodeError::InvalidFormat(format!( + "V1 chunk trace_id must be 16 bytes, got {len}" + ))); + } + let bytes = buf.try_slice_and_advance(16).ok_or_else(|| { + DecodeError::InvalidFormat("V1 chunk trace_id payload truncated".to_owned()) + })?; + let slice: &[u8] = bytes.borrow(); + chunk.trace_id.copy_from_slice(slice); + saw_trace_id = true; + } + chunk_key::SPANS => { + let count = decode::read_array_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 chunk spans array len read failure".to_owned()) + })?; + let mut spans = Vec::with_capacity(count as usize); + for _ in 0..count { + spans.push(span::decode_span(buf, table)?); + } + chunk.spans = spans; + saw_spans = true; + } + chunk_key::ORIGIN => chunk.origin = read_interned_string(buf, table)?, + chunk_key::PRIORITY => { + let v: i64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 chunk priority read failure".to_owned()) + })?; + chunk.priority = Some(v as i32); + } + chunk_key::SAMPLING_MECHANISM => { + let v: u64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat( + "V1 chunk sampling_mechanism read failure".to_owned(), + ) + })?; + chunk.sampling_mechanism = Some(v as u32); + } + chunk_key::ATTRIBUTES => { + chunk.attributes = span::read_attributes_map(buf, table)?; + } + chunk_key::DROPPED_TRACE => { + chunk.dropped_trace = decode::read_bool(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat( + "V1 chunk dropped_trace bool read failure".to_owned(), + ) + })?; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 chunk key: {unknown}" + ))); + } + } + } + + if !saw_trace_id { + return Err(DecodeError::InvalidFormat( + "V1 chunk is missing trace_id".to_owned(), + )); + } + if !saw_spans { + return Err(DecodeError::InvalidFormat( + "V1 chunk is missing spans array".to_owned(), + )); + } + + Ok(chunk) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::msgpack_encoder::v1::to_vec_from_payload_v1; + use crate::span::v1::{ + AttributeValue, Span as V1Span, SpanBytes as V1SpanBytes, SpanKind, TraceChunkBytes, + TracerPayloadBytes, + }; + use crate::span::vec_map::VecMap; + use bolero::check; + use libdd_tinybytes::{Bytes, BytesString}; + + fn bs(s: &str) -> BytesString { + BytesString::from_slice(s.as_bytes()).expect("test string must fit in BytesString") + } + + fn sample_payload() -> TracerPayloadBytes { + let mut attrs = VecMap::>::new(); + attrs.insert(bs("http.method"), AttributeValue::String(bs("GET"))); + attrs.insert(bs("http.status"), AttributeValue::Int(200)); + attrs.insert(bs("is_root"), AttributeValue::Bool(true)); + attrs.insert(bs("ratio"), AttributeValue::Float(0.75)); + attrs.insert( + bs("ids"), + AttributeValue::List(vec![AttributeValue::Int(1), AttributeValue::Int(2)]), + ); + + let span = V1Span { + service: bs("svc"), + name: bs("GET /users"), + resource: bs("/users"), + r#type: bs("web"), + span_id: 42, + parent_id: 7, + start: 1_700_000_000_000, + duration: 1_500, + error: true, + span_kind: SpanKind::Server, + env: bs("prod"), + version: bs("1.2.3"), + component: bs("net/http"), + attributes: attrs, + ..Default::default() + }; + + let mut chunk_attrs = VecMap::>::new(); + chunk_attrs.insert(bs("_dd.p.dm"), AttributeValue::String(bs("-1"))); + + let chunk = TraceChunkBytes { + trace_id: [1u8; 16], + priority: Some(1), + origin: bs("synthetic"), + sampling_mechanism: Some(2), + dropped_trace: false, + attributes: chunk_attrs, + spans: vec![span], + }; + + TracerPayloadBytes { + language_name: bs("rust"), + language_version: bs("1.87"), + tracer_version: bs("9.9.9"), + runtime_id: bs("abcd-1234"), + env: bs("prod"), + hostname: bs("host-1"), + app_version: bs("1.2.3"), + chunks: vec![chunk], + ..Default::default() + } + } + + #[test] + fn roundtrip_full_payload() { + let original = sample_payload(); + let bytes = to_vec_from_payload_v1(&original); + let payload_len = bytes.len(); + let (decoded, consumed) = + from_bytes(Bytes::from(bytes)).expect("decoder should succeed on encoder output"); + + assert_eq!(consumed, payload_len, "decoder should consume all bytes"); + + // Tracer-level metadata + assert_eq!(decoded.language_name.as_str(), "rust"); + assert_eq!(decoded.language_version.as_str(), "1.87"); + assert_eq!(decoded.tracer_version.as_str(), "9.9.9"); + assert_eq!(decoded.runtime_id.as_str(), "abcd-1234"); + assert_eq!(decoded.env.as_str(), "prod"); + assert_eq!(decoded.hostname.as_str(), "host-1"); + assert_eq!(decoded.app_version.as_str(), "1.2.3"); + + // Chunk + assert_eq!(decoded.chunks.len(), 1); + let chunk = &decoded.chunks[0]; + assert_eq!(chunk.trace_id, [1u8; 16]); + assert_eq!(chunk.priority, Some(1)); + assert_eq!(chunk.sampling_mechanism, Some(2)); + assert_eq!(chunk.origin.as_str(), "synthetic"); + assert_eq!(chunk.attributes.len(), 1); + + // Span + assert_eq!(chunk.spans.len(), 1); + let span = &chunk.spans[0]; + assert_eq!(span.service.as_str(), "svc"); + assert_eq!(span.name.as_str(), "GET /users"); + assert_eq!(span.resource.as_str(), "/users"); + assert_eq!(span.r#type.as_str(), "web"); + assert_eq!(span.span_id, 42); + assert_eq!(span.parent_id, 7); + assert_eq!(span.start, 1_700_000_000_000); + assert_eq!(span.duration, 1_500); + assert!(span.error); + assert_eq!(span.span_kind, SpanKind::Server); + assert_eq!(span.env.as_str(), "prod"); + assert_eq!(span.version.as_str(), "1.2.3"); + assert_eq!(span.component.as_str(), "net/http"); + assert_eq!(span.attributes.len(), 5); + } + + #[test] + fn empty_payload_roundtrip() { + let original = TracerPayloadBytes::default(); + let bytes = to_vec_from_payload_v1(&original); + let (decoded, _) = + from_bytes(Bytes::from(bytes)).expect("decoder should succeed on empty payload"); + assert!(decoded.chunks.is_empty()); + assert!(decoded.language_name.as_str().is_empty()); + } + + #[test] + fn missing_chunks_field_is_rejected() { + // Manually encode a payload map with only one entry (env), no chunks field. + // `0x81` = fixmap len 1, key 0x07 (ENV_REF), value = inline str "x" (`0xa1 0x78`). + let bytes = vec![0x81, 0x07, 0xa1, 0x78]; + let err = from_bytes(Bytes::from(bytes)).expect_err("missing chunks must error"); + assert!(matches!(err, DecodeError::InvalidFormat(_))); + } + + #[test] + fn string_interning_resolves_across_chunks() { + // Two chunks sharing the same service name. The decoded service strings must both + // be "shared" — verifying the streaming string table is preserved across chunks. + let span_a = V1Span { + service: bs("shared"), + name: bs("a"), + span_id: 1, + start: 1, + ..Default::default() + }; + let span_b = V1Span { + service: bs("shared"), + name: bs("b"), + span_id: 2, + start: 1, + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![ + TraceChunkBytes { + trace_id: [1u8; 16], + spans: vec![span_a], + ..Default::default() + }, + TraceChunkBytes { + trace_id: [2u8; 16], + spans: vec![span_b], + ..Default::default() + }, + ], + ..Default::default() + }; + let bytes = to_vec_from_payload_v1(&payload); + let (decoded, _) = + from_bytes(Bytes::from(bytes)).expect("decoder should resolve interned strings"); + assert_eq!(decoded.chunks[0].spans[0].service.as_str(), "shared"); + assert_eq!(decoded.chunks[1].spans[0].service.as_str(), "shared"); + } + + #[test] + fn nested_keyvalue_attribute_roundtrip() { + let mut inner = VecMap::>::new(); + inner.insert(bs("k"), AttributeValue::String(bs("v"))); + let mut attrs = VecMap::>::new(); + attrs.insert(bs("nested"), AttributeValue::KeyValue(inner)); + + let span = V1Span { + service: bs("svc"), + name: bs("op"), + span_id: 1, + start: 1, + attributes: attrs, + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + spans: vec![span], + ..Default::default() + }], + ..Default::default() + }; + let bytes = to_vec_from_payload_v1(&payload); + let (decoded, _) = from_bytes(Bytes::from(bytes)).expect("nested KeyValue roundtrip"); + + let decoded_attrs = &decoded.chunks[0].spans[0].attributes; + match decoded_attrs.get(&bs("nested")) { + Some(AttributeValue::KeyValue(map)) => { + assert_eq!(map.len(), 1); + match map.get(&bs("k")) { + Some(AttributeValue::String(v)) => assert_eq!(v.as_str(), "v"), + _ => panic!("inner value should be String"), + } + } + _ => panic!("attribute should decode as KeyValue"), + } + } + + /// Fuzz test: bolero generates random strings + numbers for the V1 payload, the encoder + /// serialises it, and the decoder must accept its own output (no panic, no error). Mirrors + /// the v04 `fuzz_from_bytes` pattern. Bolero caps tuples at 12 fields — extra metadata is + /// either omitted or filled with deterministic defaults. + #[test] + #[cfg_attr(miri, ignore)] + fn fuzz_from_bytes() { + check!() + .with_type::<( + String, // language_name + String, // env (payload-level) + String, // service + String, // name + String, // resource + String, // span env + String, // attr_key + String, // attr_value + u64, // span_id + u64, // parent_id + u64, // start + bool, // error + )>() + .cloned() + .for_each( + |( + lang, + payload_env, + service, + name, + resource, + span_env, + attr_key, + attr_value, + span_id, + parent_id, + start, + error, + )| { + let bs = |s: &str| BytesString::from_slice(s.as_ref()).unwrap(); + let mut attrs = VecMap::>::new(); + attrs.insert(bs(&attr_key), AttributeValue::String(bs(&attr_value))); + + let span = V1SpanBytes { + service: bs(&service), + name: bs(&name), + resource: bs(&resource), + span_id, + parent_id, + start: start as i64, + error, + env: bs(&span_env), + attributes: attrs, + ..Default::default() + }; + + let payload = TracerPayloadBytes { + language_name: bs(&lang), + env: bs(&payload_env), + chunks: vec![TraceChunkBytes { + trace_id: [0xab; 16], + spans: vec![span], + ..Default::default() + }], + ..Default::default() + }; + + let encoded = to_vec_from_payload_v1(&payload); + let result = from_bytes(Bytes::from(encoded)); + assert!( + result.is_ok(), + "decoder rejected its own encoded output: {result:?}" + ); + }, + ); + } +} diff --git a/libdd-trace-utils/src/msgpack_decoder/v1/span.rs b/libdd-trace-utils/src/msgpack_decoder/v1/span.rs new file mode 100644 index 0000000000..16822f959e --- /dev/null +++ b/libdd-trace-utils/src/msgpack_decoder/v1/span.rs @@ -0,0 +1,330 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +// Naming convention: parent module (`v1`) = input wire format being decoded. This file decodes +// a V1 msgpack span into a [`crate::span::v1::Span`]. + +use super::{ + read_interned_string, span_event_key, span_key, span_link_key, StringTable, + ANY_VALUE_KEY_ARRAY, ANY_VALUE_KEY_BOOL, ANY_VALUE_KEY_BYTES, ANY_VALUE_KEY_DOUBLE, + ANY_VALUE_KEY_INT64, ANY_VALUE_KEY_KEY_VALUE_LIST, ANY_VALUE_KEY_STRING, FLAT_ATTR_STRIDE, + TYPED_VALUE_STRIDE, +}; +use crate::msgpack_decoder::decode::buffer::Buffer; +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::v1::{AttributeValue, Span, SpanEvent, SpanKind, SpanLink, ThinVec}; +use crate::span::vec_map::VecMap; +use crate::span::DeserializableTraceData; +use rmp::decode; +use std::borrow::Borrow; + +/// Decodes a V1 span (msgpack map with integer keys) into a [`Span`]. +/// +/// The streaming `StringTable` is shared across the whole payload, so interned references in +/// this span can resolve to strings that appeared in an earlier chunk or payload header. +pub(super) fn decode_span( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("Unable to read V1 span map len".to_owned()))?; + + let mut span = Span::::default(); + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span key (u8) read failure".to_owned()))?; + + match key { + span_key::SERVICE => span.service = read_interned_string(buf, table)?, + span_key::NAME => span.name = read_interned_string(buf, table)?, + span_key::RESOURCE => span.resource = read_interned_string(buf, table)?, + span_key::SPAN_ID => { + span.span_id = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_id u64 read failure".to_owned()) + })? + } + span_key::PARENT_ID => { + span.parent_id = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 parent_id u64 read failure".to_owned()) + })? + } + span_key::START => { + span.start = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span start u64 read failure".to_owned()) + })? as i64; + } + span_key::DURATION => { + span.duration = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span duration u64 read failure".to_owned()) + })? as i64; + } + span_key::ERROR => { + span.error = decode::read_bool(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span error bool read failure".to_owned()) + })?; + } + span_key::TYPE => span.r#type = read_interned_string(buf, table)?, + span_key::ATTRIBUTES => span.attributes = read_attributes_map(buf, table)?, + span_key::SPAN_LINKS => span.span_links = read_span_links(buf, table)?, + span_key::SPAN_EVENTS => span.span_events = read_span_events(buf, table)?, + span_key::ENV => span.env = read_interned_string(buf, table)?, + span_key::VERSION => span.version = read_interned_string(buf, table)?, + span_key::COMPONENT => span.component = read_interned_string(buf, table)?, + span_key::KIND => { + let kind: u32 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_kind u32 read failure".to_owned()) + })?; + // OTEL spec: unset / unknown → Internal. + span.span_kind = match kind { + 2 => SpanKind::Server, + 3 => SpanKind::Client, + 4 => SpanKind::Producer, + 5 => SpanKind::Consumer, + _ => SpanKind::Internal, + }; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 span key: {unknown}" + ))); + } + } + } + + span.attributes.mark_deduped(); + + Ok(span) +} + +/// Reads a V1 attributes map encoded as a flat array of `[key, type_uint8, value, ...]` triplets. +pub(super) fn read_attributes_map( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let flat_len = decode::read_array_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attributes flat array len read failure".to_owned()) + })?; + + if flat_len % FLAT_ATTR_STRIDE != 0 { + return Err(DecodeError::InvalidFormat(format!( + "V1 attributes flat array length {flat_len} is not a multiple of {FLAT_ATTR_STRIDE}" + ))); + } + + let entries = (flat_len / FLAT_ATTR_STRIDE) as usize; + let mut map = VecMap::with_capacity(entries); + + for _ in 0..entries { + let key = read_interned_string(buf, table)?; + let value = read_typed_attribute_value(buf, table)?; + map.insert(key, value); + } + + map.mark_deduped(); + + Ok(map) +} + +/// Reads `[type_uint8, value]` and dispatches by type discriminant. Recurses into `Array` and +/// `KeyValueList`. +pub(super) fn read_typed_attribute_value( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let ty = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute type discriminant read failure".to_owned()) + })?; + + match ty { + ANY_VALUE_KEY_STRING => Ok(AttributeValue::String(read_interned_string(buf, table)?)), + ANY_VALUE_KEY_BOOL => { + let b = decode::read_bool(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Bool read failure".to_owned()) + })?; + Ok(AttributeValue::Bool(b)) + } + ANY_VALUE_KEY_DOUBLE => { + let f = decode::read_f64(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Double read failure".to_owned()) + })?; + Ok(AttributeValue::Float(f)) + } + ANY_VALUE_KEY_INT64 => { + // Encoder writes signed via `write_sint`, which can emit any int marker. `read_int` + // accepts any integer marker that fits in i64. + let i: i64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Int64 read failure".to_owned()) + })?; + Ok(AttributeValue::Int(i)) + } + ANY_VALUE_KEY_BYTES => Ok(AttributeValue::Bytes(read_bin(buf)?)), + ANY_VALUE_KEY_ARRAY => { + let array_len_with_stride = + decode::read_array_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Array len read failure".to_owned()) + })?; + // Array is a flat sequence of [type, value] pairs. + if array_len_with_stride % TYPED_VALUE_STRIDE != 0 { + return Err(DecodeError::InvalidFormat(format!( + "V1 attribute Array length {array_len_with_stride} is not a multiple of {TYPED_VALUE_STRIDE}" + ))); + } + let n = (array_len_with_stride / TYPED_VALUE_STRIDE) as usize; + let mut items = Vec::with_capacity(n); + for _ in 0..n { + items.push(read_typed_attribute_value(buf, table)?); + } + Ok(AttributeValue::List(items)) + } + ANY_VALUE_KEY_KEY_VALUE_LIST => { + Ok(AttributeValue::KeyValue(read_attributes_map(buf, table)?)) + } + unknown => Err(DecodeError::InvalidFormat(format!( + "Unknown V1 AnyValue type discriminant: {unknown}" + ))), + } +} + +/// Reads a msgpack `bin` and slices the matching range out of the buffer. +fn read_bin(buf: &mut Buffer) -> Result { + let len = decode::read_bin_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 bin len read failure".to_owned()))?; + buf.try_slice_and_advance(len as usize) + .ok_or_else(|| DecodeError::InvalidFormat("V1 bin payload truncated".to_owned())) +} + +/// Reads the span_links array. The `SpanLinks` map key has already been consumed by the caller. +pub(super) fn read_span_links( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let count = decode::read_array_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_links len read failure".to_owned()))?; + let mut links = ThinVec::with_capacity(count as usize); + for _ in 0..count { + links.push(decode_span_link(buf, table)?); + } + Ok(links) +} + +fn decode_span_link( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_link map len read failure".to_owned()))?; + let mut link = SpanLink::::default(); + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_link key (u8) read failure".to_owned()) + })?; + match key { + span_link_key::TRACE_ID => { + let bytes = read_bin(buf)?; + let slice: &[u8] = bytes.borrow(); + if slice.len() != 16 { + return Err(DecodeError::InvalidFormat(format!( + "V1 span_link trace_id expected 16 bytes, got {}", + slice.len() + ))); + } + let mut arr = [0u8; 16]; + arr.copy_from_slice(slice); + link.trace_id = arr; + } + span_link_key::SPAN_ID => { + link.span_id = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_link span_id read failure".to_owned()) + })?; + } + span_link_key::ATTRIBUTES => { + link.attributes = read_attributes_map(buf, table)?; + } + span_link_key::TRACE_STATE => { + link.tracestate = read_interned_string(buf, table)?; + } + span_link_key::FLAGS => { + let v: u64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_link flags read failure".to_owned()) + })?; + link.flags = v as u32; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 span_link key: {unknown}" + ))); + } + } + } + Ok(link) +} + +pub(super) fn read_span_events( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let count = decode::read_array_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_events len read failure".to_owned()))?; + let mut events = ThinVec::with_capacity(count as usize); + for _ in 0..count { + events.push(decode_span_event(buf, table)?); + } + Ok(events) +} + +fn decode_span_event( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_event map len read failure".to_owned()))?; + let mut event = SpanEvent::::default(); + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_event key (u8) read failure".to_owned()) + })?; + match key { + span_event_key::TIME => { + event.time_unix_nano = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_event time read failure".to_owned()) + })?; + } + span_event_key::NAME => { + event.name = read_interned_string(buf, table)?; + } + span_event_key::ATTRIBUTES => { + event.attributes = read_attributes_map(buf, table)?; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 span_event key: {unknown}" + ))); + } + } + } + Ok(event) +} diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index 53d14d8ecd..cb6e7fe2f0 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -407,6 +407,24 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; + futures.push(self.send_payload( + capabilities, + chunks, + payload, + headers, + endpoint.as_ref(), + )); + } + TracerPayloadCollection::V1(payload) => { + #[allow(clippy::unwrap_used)] + let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); + let mut headers = self.headers.clone(); + headers.reserve(2); + headers.insert(DATADOG_TRACE_COUNT, chunks.into()); + headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK); + + let payload = msgpack_encoder::v1::to_vec_from_payload_v1(payload); + futures.push(self.send_payload( capabilities, chunks, @@ -537,6 +555,9 @@ mod tests { msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize } TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(), + TracerPayloadCollection::V1(payload) => { + msgpack_encoder::v1::to_encoded_byte_len_from_payload_v1(payload) as usize + } } } diff --git a/libdd-trace-utils/src/trace_utils.rs b/libdd-trace-utils/src/trace_utils.rs index f790ec069c..6fd439a2ee 100644 --- a/libdd-trace-utils/src/trace_utils.rs +++ b/libdd-trace-utils/src/trace_utils.rs @@ -7,7 +7,7 @@ use crate::span::v05::dict::SharedDict; use crate::span::{v05, TraceData}; pub use crate::tracer_header_tags::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; -use crate::tracer_payload::{self, TraceChunks, TraceEncoding}; +use crate::tracer_payload::{self, TraceChunks}; use anyhow::anyhow; use bytes::buf::Reader; use bytes::Buf; @@ -585,25 +585,28 @@ pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) { } } -pub fn collect_trace_chunks( +/// Converts v0.4-shaped span chunks into the v0.5 wire representation. +/// +/// v0.5 deduplicates every string field across the whole payload through a shared dictionary +/// and replaces them with `u32` indices. This walks each span via [`v05::from_v04_span`], +/// interning strings into the [`SharedDict`] as it goes, and returns the resulting +/// `(dict, traces)` pair wrapped in [`TraceChunks::V05`]. +/// +/// Returns `Err` if any span fails to convert (e.g. unsupported field value); the partial +/// dictionary built so far is discarded. +pub fn convert_trace_chunks_v04_to_v05( traces: Vec>>, - format: TraceEncoding, ) -> anyhow::Result> { - match format { - TraceEncoding::V05 => { - let mut shared_dict = SharedDict::default(); - let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); - for trace in traces { - let v05_trace = trace - .into_iter() - .map(|span| v05::from_v04_span(span, &mut shared_dict)) - .collect::>>()?; - v05_traces.push(v05_trace); - } - Ok(TraceChunks::V05((shared_dict, v05_traces))) - } - TraceEncoding::V04 => Ok(TraceChunks::V04(traces)), + let mut shared_dict = SharedDict::default(); + let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); + for trace in traces { + let v05_trace = trace + .into_iter() + .map(|span| v05::from_v04_span(span, &mut shared_dict)) + .collect::>>()?; + v05_traces.push(v05_trace); } + Ok(TraceChunks::V05((shared_dict, v05_traces))) } pub fn collect_pb_trace_chunks( @@ -1121,10 +1124,10 @@ mod tests { } #[test] - fn test_collect_trace_chunks_v05() { + fn test_convert_trace_chunks_v04_to_v05() { let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)]; - let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V05).unwrap(); + let collection = convert_trace_chunks_v04_to_v05(vec![chunk]).unwrap(); let (dict, traces) = match collection { TraceChunks::V05(payload) => payload, @@ -1195,27 +1198,6 @@ mod tests { ); } - #[test] - fn test_collect_trace_chunks_v04() { - let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)]; - - let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V04).unwrap(); - - let traces = match collection { - TraceChunks::V04(traces) => traces, - _ => panic!("Unexpected type"), - }; - - assert_eq!(traces.len(), 1); - assert_eq!(traces[0].len(), 1); - let span = &traces[0][0]; - assert_eq!(span.trace_id, 123); - assert_eq!(span.span_id, 456); - assert_eq!(span.parent_id, 789); - assert_eq!(span.start, 1); - assert_eq!(span.error, 0); - } - #[test] fn test_rmp_serde_deserialize_meta_with_null_values() { // Create a JSON representation with null value in meta diff --git a/libdd-trace-utils/src/tracer_payload.rs b/libdd-trace-utils/src/tracer_payload.rs index 035366fd80..99677c7ddb 100644 --- a/libdd-trace-utils/src/tracer_payload.rs +++ b/libdd-trace-utils/src/tracer_payload.rs @@ -2,12 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::span::v05::dict::SharedDict; -use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData}; -use crate::trace_utils::collect_trace_chunks; +use crate::span::{v04, v05, v1, BytesData, SharedDictBytes, TraceData}; +use crate::trace_utils::convert_trace_chunks_v04_to_v05; use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; +use anyhow::Ok; use libdd_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; +use tracing::warn; pub type TracerPayloadV04 = Vec; pub type TracerPayloadV05 = Vec; @@ -19,6 +21,8 @@ pub enum TraceEncoding { V04, /// v0.5 encoding (TracerPayloadV05). V05, + /// v1 encoding (TracerPayloadV1). + V1, } #[derive(Debug)] @@ -28,7 +32,7 @@ pub enum TraceChunks { /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDict, Vec>)), /// Collection of v0.4 spans to be serialized as a V1 msgpack payload. - V1(Vec>>), + V1(v1::TracerPayload), } impl TraceChunks { @@ -36,8 +40,7 @@ impl TraceChunks { match self { TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces), - // V1 uses the same underlying span structure as V04. - TraceChunks::V1(traces) => TracerPayloadCollection::V04(traces), + TraceChunks::V1(traces) => TracerPayloadCollection::V1(traces), } } } @@ -48,7 +51,7 @@ impl TraceChunks { match self { TraceChunks::V04(traces) => traces.len(), TraceChunks::V05((_, traces)) => traces.len(), - TraceChunks::V1(traces) => traces.len(), + TraceChunks::V1(trace) => trace.chunks.len(), } } } @@ -62,6 +65,8 @@ pub enum TracerPayloadCollection { V04(Vec>), /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDictBytes, Vec>)), + // /// V0.4-shaped spans that must be serialized as a V1 msgpack payload on send. + V1(v1::TracerPayload), } impl TracerPayloadCollection { @@ -92,6 +97,18 @@ impl TracerPayloadCollection { dest.append(src) } } + TracerPayloadCollection::V1(dest) => { + if let TracerPayloadCollection::V1(src) = other { + // Same-target SendData entries are coalesced by + // trace_utils::coalesce_send_data, so both V1 payloads + // typically share tracer-level metadata. If all metadata + // fields match we append `src`'s chunks into `dest`; if any diverge we no-op + // (logging a warning) rather than silently dropping `src`'s metadata. + if metadata_matches_v1(dest, src) { + dest.chunks.append(&mut src.chunks); + } + } + } // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). #[allow(clippy::unimplemented)] TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"), @@ -144,6 +161,7 @@ impl TracerPayloadCollection { } TracerPayloadCollection::V04(collection) => collection.len(), TracerPayloadCollection::V05((_, collection)) => collection.len(), + TracerPayloadCollection::V1(collection) => collection.chunks.len(), } } } @@ -228,13 +246,63 @@ pub fn decode_to_trace_chunks( data: libdd_tinybytes::Bytes, encoding_type: TraceEncoding, ) -> Result<(TraceChunks, usize), anyhow::Error> { - let (data, size) = match encoding_type { - TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), - TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data), + match encoding_type { + TraceEncoding::V04 => { + let (data, size) = msgpack_decoder::v04::from_bytes(data).map_err(|e| { + anyhow::format_err!("Error deserializing trace from request body: {e}") + })?; + Ok((TraceChunks::V04(data), size)) + } + TraceEncoding::V05 => { + let (data, size) = msgpack_decoder::v05::from_bytes(data).map_err(|e| { + anyhow::format_err!("Error deserializing trace from request body: {e}") + })?; + Ok((convert_trace_chunks_v04_to_v05(data)?, size)) + } + TraceEncoding::V1 => { + let (data, size) = msgpack_decoder::v1::from_bytes(data).map_err(|e| { + anyhow::format_err!("Error deserializing trace from request body: {e}") + })?; + Ok((TraceChunks::V1(data), size)) + } } - .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; +} - Ok((collect_trace_chunks(data, encoding_type)?, size)) +/// Returns `true` iff every tracer-level metadata string field of `src` matches `dest`. +/// +/// V1 payloads carry tracer metadata (env, hostname, language, …) inside the payload itself, so +/// merging two payloads whose metadata diverges would silently drop one set of values. Callers +/// use this to gate the merge: on a `false` return, append is skipped (no-op) and the two +/// payloads stay separate. A warning is logged listing the diverging fields so the situation +/// is observable rather than silent. +fn metadata_matches_v1( + dest: &v1::TracerPayload, + src: &v1::TracerPayload, +) -> bool { + let differing: Vec<&'static str> = [ + ("language_name", dest.language_name == src.language_name), + ( + "language_version", + dest.language_version == src.language_version, + ), + ("tracer_version", dest.tracer_version == src.tracer_version), + ("runtime_id", dest.runtime_id == src.runtime_id), + ("env", dest.env == src.env), + ("hostname", dest.hostname == src.hostname), + ("app_version", dest.app_version == src.app_version), + ] + .into_iter() + .filter_map(|(label, eq)| (!eq).then_some(label)) + .collect(); + + if !differing.is_empty() { + warn!( + "Skipping V1 TracerPayload append: diverging metadata fields {:?}", + differing + ); + return false; + } + true } #[cfg(test)] diff --git a/libdd-trace-utils/tests/snapshots/compare_v1_full_payload_snapshot_test.json b/libdd-trace-utils/tests/snapshots/compare_v1_full_payload_snapshot_test.json new file mode 100644 index 0000000000..265c35f962 --- /dev/null +++ b/libdd-trace-utils/tests/snapshots/compare_v1_full_payload_snapshot_test.json @@ -0,0 +1,60 @@ +[[ + { + "name": "test_send_data_v1_full_root", + "service": "test-service", + "resource": "/api/users", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "web", + "meta": { + "_dd.origin": "synthetics", + "_dd.p.dm": "-4", + "_dd.p.tid": "0123456789abcdef", + "component": "http", + "env": "test-env", + "http.method": "POST", + "span.kind": "server", + "version": "1.2.3" + }, + "metrics": { + "_sampling_priority_v1": 1, + "http.duration_ms": 42.75, + "http.status_code": 201, + "http.success": 1 + }, + "span_links": [ + { + "trace_id": 18364758544493064720, + "trace_id_high": 81985529216486895, + "span_id": 11574427654092267680, + "attributes": { + "link.reason": "retry" + }, + "tracestate": "dd=t.tid:abc", + "flags": 1 + } + ], + "span_events": [ + { + "time_unix_nano": 1727211691770715042, + "name": "exception", + "attributes": { + "exception.code": { + "type": 2, + "int_value": 500 + }, + "exception.handled": { + "type": 1, + "bool_value": false + }, + "exception.type": { + "type": 0, + "string_value": "RuntimeError" + } + } + } + ], + "duration": 5000, + "start": 1000000 + }]] diff --git a/libdd-trace-utils/tests/test_send_data.rs b/libdd-trace-utils/tests/test_send_data.rs index 39a10b58ff..a3f3070ff2 100644 --- a/libdd-trace-utils/tests/test_send_data.rs +++ b/libdd-trace-utils/tests/test_send_data.rs @@ -627,4 +627,129 @@ mod tracing_integration_tests { // 2 decoded spans. The checked-in snapshot is the canonical equivalent decoded form. test_agent.assert_snapshot(snapshot_name).await; } + + /// Builds a V1 payload exercising the full attribute surface that the current test-agent + /// (v1.56.0) is able to decode: `AttributeValue::String/Bool/Int/Float`, span links with + /// non-empty attributes, span events with non-empty attributes, plus chunk-level attributes. + /// + /// APMSP-3479 - TODO: `AttributeValue::Bytes`, `AttributeValue::List` and + /// `AttributeValue::KeyValue` are deliberately omitted — `ddapm-test-agent` v1.56.0 rejects + /// Bytes with `400 "Bytes values are not supported yet."` and does not handle List / KeyValue. + /// Add them here once test-agent V1 support catches up. + fn make_v1_full_payload(name_prefix: &str) -> libdd_trace_utils::span::v1::TracerPayloadBytes { + use libdd_trace_utils::span::v1::{ + AttributeValue, AttributeValueBytes, SpanBytes as V1SpanBytes, SpanEventBytes, + SpanKind, SpanLinkBytes, TraceChunkBytes, TracerPayloadBytes, + }; + + // Root span attributes — primitives supported by the test-agent today. + let mut root_attrs: VecMap = VecMap::new(); + root_attrs.insert(bs_v1("http.method"), AttributeValue::String(bs_v1("POST"))); + root_attrs.insert(bs_v1("http.status_code"), AttributeValue::Int(201)); + root_attrs.insert(bs_v1("http.success"), AttributeValue::Bool(true)); + root_attrs.insert(bs_v1("http.duration_ms"), AttributeValue::Float(42.75)); + + // Span link with non-empty attributes (String-only — primitives are enough to exercise + // the encoder/decoder paths for link attribute maps). + let mut link_attrs: VecMap = VecMap::new(); + link_attrs.insert(bs_v1("link.reason"), AttributeValue::String(bs_v1("retry"))); + let span_link = SpanLinkBytes { + trace_id: tid_bytes(0x0123_4567_89ab_cdef, 0xfedc_ba98_7654_3210), + span_id: 0xa0a0_a0a0_a0a0_a0a0, + tracestate: bs_v1("dd=t.tid:abc"), + flags: 1, + attributes: link_attrs, + }; + + // Span event with attributes (String + Int + Bool to exercise typed values). + let mut event_attrs: VecMap = VecMap::new(); + event_attrs.insert( + bs_v1("exception.type"), + AttributeValue::String(bs_v1("RuntimeError")), + ); + event_attrs.insert(bs_v1("exception.code"), AttributeValue::Int(500)); + event_attrs.insert(bs_v1("exception.handled"), AttributeValue::Bool(false)); + let span_event = SpanEventBytes { + time_unix_nano: 1_727_211_691_770_715_042, + name: bs_v1("exception"), + attributes: event_attrs, + }; + + let root_span = V1SpanBytes { + service: bs_v1("test-service"), + name: bs_v1(&format!("{name_prefix}_root")), + resource: bs_v1("/api/users"), + r#type: bs_v1("web"), + span_id: 1, + parent_id: 0, + start: 1_000_000, + duration: 5_000, + span_kind: SpanKind::Server, + env: bs_v1("test-env"), + version: bs_v1("1.2.3"), + component: bs_v1("http"), + attributes: root_attrs, + span_links: thin_vec::thin_vec![span_link], + span_events: thin_vec::thin_vec![span_event], + ..Default::default() + }; + + let mut chunk_attrs = VecMap::new(); + chunk_attrs.insert(bs_v1("_dd.p.dm"), AttributeValue::String(bs_v1("-4"))); + chunk_attrs.insert( + bs_v1("_dd.p.tid"), + AttributeValue::String(bs_v1("0123456789abcdef")), + ); + + let chunk = TraceChunkBytes { + trace_id: tid_bytes(0, 0xfeedface), + priority: Some(1), + origin: bs_v1("synthetics"), + sampling_mechanism: Some(4), + attributes: chunk_attrs, + dropped_trace: false, + spans: vec![root_span], + }; + + TracerPayloadBytes { + language_name: bs_v1("test-lang"), + language_version: bs_v1("2.0"), + tracer_version: bs_v1("1.0"), + runtime_id: bs_v1("test-runtime-id-full"), + env: bs_v1("test-env"), + hostname: bs_v1("test-host"), + app_version: bs_v1("1.2.3"), + attributes: VecMap::new(), + chunks: vec![chunk], + } + } + + /// Big V1 integration test: encodes a payload covering every primitive attribute variant + /// the test-agent supports today (String/Bool/Int/Float) plus non-empty span_link and + /// span_event attribute maps, then POSTs to `/v1.0/traces` and asserts the snapshot. + /// + /// APMSP-3479 - TODO: extend this with `AttributeValue::Bytes`, `AttributeValue::List` + /// and `AttributeValue::KeyValue` once `ddapm-test-agent` v1 support catches up. See + /// [`make_v1_full_payload`] for the payload builder and the omitted variants. + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn compare_v1_full_payload_snapshot_test() { + use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_payload_v1; + + let relative_snapshot_path = "libdd-trace-utils/tests/snapshots/"; + let snapshot_name = "compare_v1_full_payload_snapshot_test"; + let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await; + let uri = test_agent + .get_uri_for_endpoint("v1.0/traces", Some(snapshot_name)) + .await; + + test_agent.start_session(snapshot_name, None).await; + + let payload = make_v1_full_payload("test_send_data_v1_full"); + let encoded = to_vec_from_payload_v1(&payload); + + post_v1_payload(uri, encoded).await; + + test_agent.assert_snapshot(snapshot_name).await; + } }