From 0efcc7c518513cedd12b5201856106a3765663ca Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Wed, 24 Jun 2026 14:02:06 +0200 Subject: [PATCH 1/3] feat: replace v04 usage by v1 in sidecar --- datadog-sidecar-ffi/src/lib.rs | 46 ++++++++++ datadog-sidecar/src/service/blocking.rs | 24 ++++++ datadog-sidecar/src/service/sender.rs | 27 ++++++ .../src/service/sidecar_interface.rs | 32 +++++++ datadog-sidecar/src/service/sidecar_server.rs | 83 ++++++++++++++++++- .../src/trace_exporter/trace_serializer.rs | 8 +- libdd-trace-utils/src/send_data/mod.rs | 27 ++++++ libdd-trace-utils/src/trace_utils.rs | 62 +++++--------- libdd-trace-utils/src/tracer_payload.rs | 31 +++++-- 9 files changed, 287 insertions(+), 53 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 25fa4550bc..566ef08d38 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -1109,6 +1109,52 @@ pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_bytes( MaybeError::None } +/// Sends a v0.4-encoded trace to the sidecar via shared memory; the sidecar will re-encode +/// it as a V1 msgpack payload before forwarding to the agent. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_send_trace_v1_shm( + transport: &mut Box, + instance_id: &InstanceId, + shm_handle: Box, + len: usize, + tracer_header_tags: &TracerHeaderTags, +) -> MaybeError { + let tracer_header_tags = try_c!(tracer_header_tags.try_into()); + + try_c!(blocking::send_trace_v1_shm( + transport, + instance_id, + *shm_handle, + len, + tracer_header_tags, + )); + + MaybeError::None +} + +/// Sends a v0.4-encoded trace as bytes to the sidecar; the sidecar will re-encode it as a +/// V1 msgpack payload before forwarding to the agent. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_send_trace_v1_bytes( + transport: &mut Box, + instance_id: &InstanceId, + data: ffi::CharSlice, + tracer_header_tags: &TracerHeaderTags, +) -> MaybeError { + let tracer_header_tags = try_c!(tracer_header_tags.try_into()); + + try_c!(blocking::send_trace_v1_bytes( + transport, + instance_id, + data.as_bytes().to_vec(), + tracer_header_tags, + )); + + MaybeError::None +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] #[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index 80a9257384..9506e575bd 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -294,6 +294,30 @@ pub fn send_trace_v04_shm( Ok(()) } +/// Sends a v0.4-encoded trace as bytes; the sidecar re-encodes it as V1 before forwarding. +pub fn send_trace_v1_bytes( + transport: &mut SidecarTransport, + instance_id: &InstanceId, + data: Vec, + headers: SerializedTracerHeaderTags, +) -> io::Result<()> { + lock_sender(transport)?.send_trace_v1_bytes(instance_id.clone(), data, headers); + Ok(()) +} + +/// Sends a v0.4-encoded trace via shared memory; the sidecar re-encodes it as V1 before +/// forwarding. +pub fn send_trace_v1_shm( + transport: &mut SidecarTransport, + instance_id: &InstanceId, + handle: ShmHandle, + len: usize, + headers: SerializedTracerHeaderTags, +) -> io::Result<()> { + lock_sender(transport)?.send_trace_v1_shm(instance_id.clone(), handle, len, headers); + Ok(()) +} + /// Sends raw data from shared memory to the debugger endpoint. pub fn send_debugger_data_shm( transport: &mut SidecarTransport, diff --git a/datadog-sidecar/src/service/sender.rs b/datadog-sidecar/src/service/sender.rs index e88720e23f..e66e7396eb 100644 --- a/datadog-sidecar/src/service/sender.rs +++ b/datadog-sidecar/src/service/sender.rs @@ -380,6 +380,33 @@ impl SidecarSender { .try_send_send_trace_v04_bytes(instance_id, data, headers); } + pub fn send_trace_v1_shm( + &mut self, + instance_id: InstanceId, + handle: ShmHandle, + len: usize, + headers: SerializedTracerHeaderTags, + ) { + if !self.try_drain_outbox() { + return; + } + self.channel + .try_send_send_trace_v1_shm(instance_id, handle, len, headers); + } + + pub fn send_trace_v1_bytes( + &mut self, + instance_id: InstanceId, + data: Vec, + headers: SerializedTracerHeaderTags, + ) { + if !self.try_drain_outbox() { + return; + } + self.channel + .try_send_send_trace_v1_bytes(instance_id, data, headers); + } + pub fn send_debugger_data_shm( &mut self, instance_id: InstanceId, diff --git a/datadog-sidecar/src/service/sidecar_interface.rs b/datadog-sidecar/src/service/sidecar_interface.rs index 6f6244b258..7e54d260d2 100644 --- a/datadog-sidecar/src/service/sidecar_interface.rs +++ b/datadog-sidecar/src/service/sidecar_interface.rs @@ -129,6 +129,38 @@ pub trait SidecarInterface { headers: SerializedTracerHeaderTags, ); + /// Sends a v0.4-encoded trace via shared memory; the sidecar re-encodes it as a V1 + /// msgpack payload before forwarding to the agent. Use this when the upstream SDK only + /// speaks v0.4 but the agent advertises `/v1.0/traces`. + /// + /// # Arguments + /// + /// * `instance_id` - The ID of the instance. + /// * `handle` - The handle to the shared memory. + /// * `len` - The size of the shared memory data. + /// * `headers` - The serialized headers from the tracer. + async fn send_trace_v1_shm( + instance_id: InstanceId, + #[SerializedHandle] handle: ShmHandle, + len: usize, + headers: SerializedTracerHeaderTags, + ); + + /// Sends a v0.4-encoded trace as bytes; the sidecar re-encodes it as a V1 msgpack payload + /// before forwarding to the agent. Use this when the upstream SDK only speaks v0.4 but + /// the agent advertises `/v1.0/traces`. + /// + /// # Arguments + /// + /// * `instance_id` - The ID of the instance. + /// * `data` - The v0.4 trace data serialized as bytes. + /// * `headers` - The serialized headers from the tracer. + async fn send_trace_v1_bytes( + instance_id: InstanceId, + data: Vec, + headers: SerializedTracerHeaderTags, + ); + /// Transfers raw data to a live-debugger endpoint. /// /// # Arguments diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 7a5e095d13..5cdac56cc3 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -249,6 +249,30 @@ impl SidecarServer { data: tinybytes::Bytes, target: &Endpoint, retry_interval: u64, + ) { + self.send_trace(headers, data, target, retry_interval, TraceEncoding::V04) + } + + /// Re-encode entry point for the V1 path. Input bytes are still v0.4 msgpack from the SDK; + /// the [`TraceEncoding::V1`] tag tells [`SendData`] to encode the wire payload as V1 before + /// forwarding to the agent. + fn send_trace_v1( + &self, + headers: &SerializedTracerHeaderTags, + data: tinybytes::Bytes, + target: &Endpoint, + retry_interval: u64, + ) { + self.send_trace(headers, data, target, retry_interval, TraceEncoding::V1) + } + + fn send_trace( + &self, + headers: &SerializedTracerHeaderTags, + data: tinybytes::Bytes, + target: &Endpoint, + retry_interval: u64, + encoding: TraceEncoding, ) { let headers: TracerHeaderTags = match headers.try_into() { Ok(headers) => headers, @@ -265,7 +289,7 @@ impl SidecarServer { headers ); - match decode_to_trace_chunks(data, TraceEncoding::V04) { + match decode_to_trace_chunks(data, encoding) { Ok((payload, size)) => { trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}"); let mut data = SendData::new( @@ -898,6 +922,63 @@ impl SidecarInterface for ConnectionSidecarHandler { } } + async fn send_trace_v1_shm( + &self, + _peer: PeerCredentials, + instance_id: InstanceId, + handle: ShmHandle, + _len: usize, + headers: SerializedTracerHeaderTags, + ) { + self.track_instance(&instance_id); + let session = self.server.get_session(&instance_id.session_id); + let trace_config = session.get_trace_config(); + if let Some(endpoint) = trace_config.endpoint.clone() { + let server = self.server.clone(); + let retry_interval = trace_config.retry_interval; + tokio::spawn(async move { + match handle.map() { + Ok(mapped) => { + let bytes = tinybytes::Bytes::from(mapped); + server.send_trace_v1(&headers, bytes, &endpoint, retry_interval); + } + Err(e) => error!("Failed mapping shared trace data memory: {}", e), + } + }); + } else { + warn!( + "Received trace data ({handle:?}) for missing session {}", + instance_id.session_id + ); + } + } + + async fn send_trace_v1_bytes( + &self, + _peer: PeerCredentials, + instance_id: InstanceId, + data: Vec, + headers: SerializedTracerHeaderTags, + ) { + self.track_instance(&instance_id); + let session = self.server.get_session(&instance_id.session_id); + let trace_config = session.get_trace_config(); + + if let Some(endpoint) = trace_config.endpoint.clone() { + let server = self.server.clone(); + let retry_interval = trace_config.retry_interval; + tokio::spawn(async move { + let bytes = tinybytes::Bytes::from(data); + server.send_trace_v1(&headers, bytes, &endpoint, retry_interval); + }); + } else { + warn!( + "Received trace data for missing session {}", + instance_id.session_id + ); + } + } + async fn send_debugger_data_shm( &self, _peer: PeerCredentials, diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 7cfd62a5a0..30dc497c83 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -17,7 +17,7 @@ use libdd_trace_utils::msgpack_encoder; use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; use libdd_trace_utils::tracer_metadata::TracerMetadata; -use libdd_trace_utils::tracer_payload::{self, TraceEncoding}; +use libdd_trace_utils::tracer_payload::{self}; /// Minimal capacity of fresh buffers allocated to encode traces, in bytes. const MIN_BUFFER_CAPACITY: usize = 1024; @@ -79,11 +79,9 @@ impl TraceSerializer { }; match output_format { TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)), - TraceExporterOutputFormat::V04 => { - trace_utils::collect_trace_chunks(traces, TraceEncoding::V04).map_err(map_err) - } + TraceExporterOutputFormat::V04 => Ok(tracer_payload::TraceChunks::V04(traces)), TraceExporterOutputFormat::V05 => { - trace_utils::collect_trace_chunks(traces, TraceEncoding::V05).map_err(map_err) + trace_utils::convert_trace_chunks_v04_to_v05(traces).map_err(map_err) } } } diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index 53d14d8ecd..5a219aa665 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -394,6 +394,29 @@ impl SendData { endpoint.as_ref(), )); } + // TracerPayloadCollection::V1(payload) => { + // // V0.4-shaped spans re-encoded as a V1 msgpack payload at send time. Used by + // // the sidecar when the upstream SDK only speaks v0.4 but the agent advertises + // // `/v1.0/traces`. `extract_payload_attrs` will pull env/hostname/app_version + // // from span meta tags since the SDK propagates them there for v0.4 payloads. + // #[allow(clippy::unwrap_used)] + // let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); + // let mut headers = self.headers.clone(); + // headers.reserve(2); + // headers.insert(DATADOG_TRACE_COUNT, chunks.into()); + // headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK); + + // let metadata = crate::tracer_metadata::TracerMetadata::default(); + // let payload = msgpack_encoder::v1::to_vec_from_payload_v1(payload); + + // futures.push(self.send_payload( + // capabilities, + // chunks, + // payload, + // headers, + // endpoint.as_ref(), + // )); + // } TracerPayloadCollection::V05(payload) => { #[allow(clippy::unwrap_used)] let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); @@ -537,6 +560,10 @@ mod tests { msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize } TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(), + // TracerPayloadCollection::V1(payloads) => { + // let metadata = crate::tracer_metadata::TracerMetadata::default(); + // msgpack_encoder::v1::to_encoded_byte_len(payloads, &metadata) as usize + // } } } diff --git a/libdd-trace-utils/src/trace_utils.rs b/libdd-trace-utils/src/trace_utils.rs index 851ac54beb..f0398235f6 100644 --- a/libdd-trace-utils/src/trace_utils.rs +++ b/libdd-trace-utils/src/trace_utils.rs @@ -7,7 +7,7 @@ use crate::span::v05::dict::SharedDict; use crate::span::{v05, TraceData}; pub use crate::tracer_header_tags::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; -use crate::tracer_payload::{self, TraceChunks, TraceEncoding}; +use crate::tracer_payload::{self, TraceChunks}; use anyhow::anyhow; use bytes::buf::Reader; use bytes::Buf; @@ -583,25 +583,28 @@ pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) { } } -pub fn collect_trace_chunks( +/// Converts v0.4-shaped span chunks into the v0.5 wire representation. +/// +/// v0.5 deduplicates every string field across the whole payload through a shared dictionary +/// and replaces them with `u32` indices. This walks each span via [`v05::from_v04_span`], +/// interning strings into the [`SharedDict`] as it goes, and returns the resulting +/// `(dict, traces)` pair wrapped in [`TraceChunks::V05`]. +/// +/// Returns `Err` if any span fails to convert (e.g. unsupported field value); the partial +/// dictionary built so far is discarded. +pub fn convert_trace_chunks_v04_to_v05( traces: Vec>>, - format: TraceEncoding, ) -> anyhow::Result> { - match format { - TraceEncoding::V05 => { - let mut shared_dict = SharedDict::default(); - let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); - for trace in traces { - let v05_trace = trace - .into_iter() - .map(|span| v05::from_v04_span(span, &mut shared_dict)) - .collect::>>()?; - v05_traces.push(v05_trace); - } - Ok(TraceChunks::V05((shared_dict, v05_traces))) - } - TraceEncoding::V04 => Ok(TraceChunks::V04(traces)), + let mut shared_dict = SharedDict::default(); + let mut v05_traces: Vec> = Vec::with_capacity(traces.len()); + for trace in traces { + let v05_trace = trace + .into_iter() + .map(|span| v05::from_v04_span(span, &mut shared_dict)) + .collect::>>()?; + v05_traces.push(v05_trace); } + Ok(TraceChunks::V05((shared_dict, v05_traces))) } pub fn collect_pb_trace_chunks( @@ -1118,10 +1121,10 @@ mod tests { } #[test] - fn test_collect_trace_chunks_v05() { + fn test_convert_trace_chunks_v04_to_v05() { let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)]; - let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V05).unwrap(); + let collection = convert_trace_chunks_v04_to_v05(vec![chunk]).unwrap(); let (dict, traces) = match collection { TraceChunks::V05(payload) => payload, @@ -1192,27 +1195,6 @@ mod tests { ); } - #[test] - fn test_collect_trace_chunks_v04() { - let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)]; - - let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V04).unwrap(); - - let traces = match collection { - TraceChunks::V04(traces) => traces, - _ => panic!("Unexpected type"), - }; - - assert_eq!(traces.len(), 1); - assert_eq!(traces[0].len(), 1); - let span = &traces[0][0]; - assert_eq!(span.trace_id, 123); - assert_eq!(span.span_id, 456); - assert_eq!(span.parent_id, 789); - assert_eq!(span.start, 1); - assert_eq!(span.error, 0); - } - #[test] fn test_rmp_serde_deserialize_meta_with_null_values() { // Create a JSON representation with null value in meta diff --git a/libdd-trace-utils/src/tracer_payload.rs b/libdd-trace-utils/src/tracer_payload.rs index 30752cabdf..2bd419ef10 100644 --- a/libdd-trace-utils/src/tracer_payload.rs +++ b/libdd-trace-utils/src/tracer_payload.rs @@ -3,8 +3,9 @@ use crate::span::v05::dict::SharedDict; use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData}; -use crate::trace_utils::collect_trace_chunks; +use crate::trace_utils::convert_trace_chunks_v04_to_v05; use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; +use anyhow::Ok; use libdd_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; @@ -19,6 +20,9 @@ pub enum TraceEncoding { V04, /// v0.5 encoding (TracerPayloadV05). V05, + /// V1 encoding. Input is decoded as v0.4 (same span shape) and re-encoded as V1 msgpack + /// when sent to the agent. + V1, } #[derive(Debug)] @@ -29,6 +33,7 @@ pub enum TraceChunks { V05((SharedDict, Vec>)), /// Collection of v0.4 spans to be serialized as a V1 msgpack payload. V1(Vec>>), + // V1(Vec>>), } impl TraceChunks { @@ -38,6 +43,7 @@ impl TraceChunks { TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces), // V1 uses the same underlying span structure as V04. TraceChunks::V1(traces) => TracerPayloadCollection::V04(traces), + // TraceChunks::V1(traces) => TracerPayloadCollection::V1(traces), } } } @@ -62,6 +68,8 @@ pub enum TracerPayloadCollection { V04(Vec>), /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDictBytes, Vec>)), + // /// V0.4-shaped spans that must be serialized as a V1 msgpack payload on send. + // V1(Vec>), } impl TracerPayloadCollection { @@ -92,6 +100,11 @@ impl TracerPayloadCollection { dest.append(src) } } + // TracerPayloadCollection::V1(dest) => { + // if let TracerPayloadCollection::V1(src) = other { + // dest.append(src) + // } + // } // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). #[allow(clippy::unimplemented)] TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"), @@ -144,6 +157,7 @@ impl TracerPayloadCollection { } TracerPayloadCollection::V04(collection) => collection.len(), TracerPayloadCollection::V05((_, collection)) => collection.len(), + // TracerPayloadCollection::V1(collection) => collection.len(), } } } @@ -228,13 +242,16 @@ pub fn decode_to_trace_chunks( data: libdd_tinybytes::Bytes, encoding_type: TraceEncoding, ) -> Result<(TraceChunks, usize), anyhow::Error> { - let (data, size) = match encoding_type { - TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), - TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data), + match encoding_type { + TraceEncoding::V04 | TraceEncoding::V1 => { + let (data, size) = msgpack_decoder::v04::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; + Ok((TraceChunks::V04(data), size)) + } + TraceEncoding::V05 => { + let (data, size) = msgpack_decoder::v05::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; + Ok((convert_trace_chunks_v04_to_v05(data)?, size)) + } } - .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; - - Ok((collect_trace_chunks(data, encoding_type)?, size)) } #[cfg(test)] From 38b5e6676762e3ad6f27444bcb3be5e1433ffec2 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 29 Jun 2026 13:45:00 +0200 Subject: [PATCH 2/3] feat: add v1 decoder --- .../src/trace_exporter/trace_serializer.rs | 6 +- libdd-trace-utils/src/msgpack_decoder/mod.rs | 1 + .../src/msgpack_decoder/v1/mod.rs | 585 ++++++++++++++++++ .../src/msgpack_decoder/v1/span.rs | 332 ++++++++++ libdd-trace-utils/src/send_data/mod.rs | 48 +- libdd-trace-utils/src/tracer_payload.rs | 75 ++- 6 files changed, 1000 insertions(+), 47 deletions(-) create mode 100644 libdd-trace-utils/src/msgpack_decoder/v1/mod.rs create mode 100644 libdd-trace-utils/src/msgpack_decoder/v1/span.rs diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 30dc497c83..8ee6246a69 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -78,7 +78,7 @@ impl TraceSerializer { TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) }; match output_format { - TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)), + TraceExporterOutputFormat::V1 => todo!("Implement V1 trace collection"), TraceExporterOutputFormat::V04 => Ok(tracer_payload::TraceChunks::V04(traces)), TraceExporterOutputFormat::V05 => { trace_utils::convert_trace_chunks_v04_to_v05(traces).map_err(map_err) @@ -126,8 +126,8 @@ impl TraceSerializer { .map_err(TraceExporterError::Serialization)?; buff } - tracer_payload::TraceChunks::V1(p) => { - msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata) + tracer_payload::TraceChunks::V1(_) => { + todo!("Implement V1 payload serialization with metadata : {:?}", metadata) } }; self.previous_serialised_len diff --git a/libdd-trace-utils/src/msgpack_decoder/mod.rs b/libdd-trace-utils/src/msgpack_decoder/mod.rs index 6a07361389..d22f2fd788 100644 --- a/libdd-trace-utils/src/msgpack_decoder/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/mod.rs @@ -4,3 +4,4 @@ pub mod decode; pub mod v04; pub mod v05; +pub mod v1; diff --git a/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs new file mode 100644 index 0000000000..0dd3046aeb --- /dev/null +++ b/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs @@ -0,0 +1,585 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub(super) mod span; + +use crate::msgpack_decoder::decode::buffer::Buffer; +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::v1::{TraceChunk, TracerPayload, TracerPayloadBytes, TracerPayloadSlice}; +use crate::span::DeserializableTraceData; +use rmp::decode; +use std::borrow::Borrow; + +// Integer keys used by the V1 wire format. Kept in sync with the encoder side +// (`msgpack_encoder::v1::{trace_key, chunk_key, SpanKey, SpanLinkKey, SpanEventKey, AnyValueKey}`). + +pub(super) mod trace_key { + pub const LANGUAGE_NAME: u8 = 3; + pub const LANGUAGE_VERSION: u8 = 4; + pub const TRACER_VERSION: u8 = 5; + pub const RUNTIME_ID: u8 = 6; + pub const ENV_REF: u8 = 7; + pub const HOSTNAME_REF: u8 = 8; + pub const APP_VERSION_REF: u8 = 9; + pub const ATTRIBUTES: u8 = 10; + pub const CHUNKS: u8 = 11; +} + +pub(super) mod chunk_key { + pub const PRIORITY: u8 = 1; + pub const ORIGIN: u8 = 2; + pub const ATTRIBUTES: u8 = 3; + pub const SPANS: u8 = 4; + pub const DROPPED_TRACE: u8 = 5; + pub const TRACE_ID: u8 = 6; + pub const SAMPLING_MECHANISM: u8 = 7; +} + +pub(super) mod span_key { + pub const SERVICE: u8 = 1; + pub const NAME: u8 = 2; + pub const RESOURCE: u8 = 3; + pub const SPAN_ID: u8 = 4; + pub const PARENT_ID: u8 = 5; + pub const START: u8 = 6; + pub const DURATION: u8 = 7; + pub const ERROR: u8 = 8; + pub const ATTRIBUTES: u8 = 9; + pub const TYPE: u8 = 10; + pub const SPAN_LINKS: u8 = 11; + pub const SPAN_EVENTS: u8 = 12; + pub const ENV: u8 = 13; + pub const VERSION: u8 = 14; + pub const COMPONENT: u8 = 15; + pub const KIND: u8 = 16; +} + +pub(super) mod span_link_key { + pub const TRACE_ID: u8 = 1; + pub const SPAN_ID: u8 = 2; + pub const ATTRIBUTES: u8 = 3; + pub const TRACE_STATE: u8 = 4; + pub const FLAGS: u8 = 5; +} + +pub(super) mod span_event_key { + pub const TIME: u8 = 1; + pub const NAME: u8 = 2; + pub const ATTRIBUTES: u8 = 3; +} + +pub(super) const ANY_VALUE_KEY_STRING: u8 = 1; +pub(super) const ANY_VALUE_KEY_BOOL: u8 = 2; +pub(super) const ANY_VALUE_KEY_DOUBLE: u8 = 3; +pub(super) const ANY_VALUE_KEY_INT64: u8 = 4; +pub(super) const ANY_VALUE_KEY_BYTES: u8 = 5; +pub(super) const ANY_VALUE_KEY_ARRAY: u8 = 6; +pub(super) const ANY_VALUE_KEY_KEY_VALUE_LIST: u8 = 7; + +/// Number of msgpack items consumed per `[type, value]` pair in a typed `Array`. +pub(super) const TYPED_VALUE_STRIDE: u32 = 2; + +/// Number of msgpack items consumed per `[key, type, value]` triplet in a typed attributes map. +pub(super) const FLAT_ATTR_STRIDE: u32 = 3; + +/// Streaming string intern table built up as the payload is decoded. +/// +/// V1 strings are encoded inline the first time they appear (as msgpack `str`), and as a +/// msgpack `uint` reference on every subsequent occurrence. ID 0 is reserved for the empty +/// string and is pre-inserted on construction. +pub(super) struct StringTable +where + T::Text: Clone, +{ + seen: Vec, +} + +impl StringTable +where + T::Text: Clone, +{ + pub(super) fn new() -> Self { + Self { + seen: vec![T::Text::default()], + } + } + + /// Resolves a string reference by ID (encoded inline as msgpack `uint`). + fn resolve(&self, id: u64) -> Result { + usize::try_from(id) + .ok() + .and_then(|i| self.seen.get(i).cloned()) + .ok_or_else(|| { + DecodeError::InvalidFormat(format!( + "V1 string table reference out of range: id={id}, table_len={}", + self.seen.len() + )) + }) + } + + /// Records a freshly-read inline string and returns it (cloned for reuse). + fn record(&mut self, s: T::Text) -> T::Text { + self.seen.push(s.clone()); + s + } +} + +/// Reads a string-or-reference value at the current buffer position. +/// +/// Decides based on the next msgpack marker: +/// - `str`/`fixstr` → read and intern, return the value +/// - any unsigned int marker → resolve the table reference +pub(super) fn read_interned_string( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result +where + T::Text: Clone, +{ + let slice: &[u8] = buf.as_mut_slice(); + let marker_byte = *slice.first().ok_or_else(|| { + DecodeError::InvalidFormat("Unexpected end of V1 buffer when reading interned string".to_owned()) + })?; + + // msgpack markers: + // fixstr : 0xa0..=0xbf + // str8/str16/str32 : 0xd9, 0xda, 0xdb + // fixint (positive): 0x00..=0x7f + // uint8/16/32/64 : 0xcc, 0xcd, 0xce, 0xcf + let is_string = matches!(marker_byte, 0xa0..=0xbf | 0xd9 | 0xda | 0xdb); + let is_uint = matches!(marker_byte, 0x00..=0x7f | 0xcc | 0xcd | 0xce | 0xcf); + + if is_string { + let s = buf.read_string()?; + Ok(table.record(s)) + } else if is_uint { + let id: u64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat( + "V1 interned string reference uint read failure".to_owned(), + ) + })?; + table.resolve(id) + } else { + Err(DecodeError::InvalidFormat(format!( + "Unexpected msgpack marker 0x{marker_byte:02x} for V1 interned string" + ))) + } +} + +/// Decodes a V1 msgpack payload from owned bytes into a [`TracerPayloadBytes`]. +/// +/// # Returns +/// +/// * `Ok((payload, payload_size))` — the decoded payload and the number of bytes consumed from +/// the buffer. +/// * `Err(DecodeError)` — if the payload is malformed. +/// +/// # Errors +/// +/// Returns an error for any malformed map / array length, unknown map key, missing required +/// field, or any embedded msgpack read failure. +pub fn from_bytes( + data: libdd_tinybytes::Bytes, +) -> Result<(TracerPayloadBytes, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +/// Decodes a V1 msgpack payload from a borrowed slice into a [`TracerPayloadSlice`]. +/// The resulting payload borrows from the input buffer (same lifetime). +pub fn from_slice(data: &[u8]) -> Result<(TracerPayloadSlice<'_>, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +/// Generic over the deserialization mode (owned `BytesData` or borrowed `SliceData`). +#[allow(clippy::type_complexity)] +pub fn from_buffer( + data: &mut Buffer, +) -> Result<(TracerPayload, usize), DecodeError> +where + T::Text: Clone, +{ + let start_len = data.len(); + let mut table = StringTable::::new(); + let payload = decode_payload(data, &mut table)?; + let consumed = start_len - data.len(); + Ok((payload, consumed)) +} + +/// Decodes the top-level V1 payload map: tracer metadata fields + chunks array. +fn decode_payload( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("Unable to read V1 payload map len".to_owned()) + })?; + + let mut payload = TracerPayload::::default(); + let mut saw_chunks = false; + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 payload key (u8) read failure".to_owned()) + })?; + match key { + trace_key::CHUNKS => { + payload.chunks = decode_chunks(buf, table)?; + saw_chunks = true; + } + trace_key::LANGUAGE_NAME => payload.language_name = read_interned_string(buf, table)?, + trace_key::LANGUAGE_VERSION => { + payload.language_version = read_interned_string(buf, table)? + } + trace_key::TRACER_VERSION => { + payload.tracer_version = read_interned_string(buf, table)? + } + trace_key::RUNTIME_ID => payload.runtime_id = read_interned_string(buf, table)?, + trace_key::ENV_REF => payload.env = read_interned_string(buf, table)?, + trace_key::HOSTNAME_REF => payload.hostname = read_interned_string(buf, table)?, + trace_key::APP_VERSION_REF => payload.app_version = read_interned_string(buf, table)?, + trace_key::ATTRIBUTES => { + payload.attributes = span::read_attributes_map(buf, table)?; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 payload key: {unknown}" + ))); + } + } + } + + if !saw_chunks { + return Err(DecodeError::InvalidFormat( + "V1 payload is missing the chunks field".to_owned(), + )); + } + + Ok(payload) +} + +fn decode_chunks( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let count = decode::read_array_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 chunks array len read failure".to_owned()))?; + let mut chunks = Vec::with_capacity(count as usize); + for _ in 0..count { + chunks.push(decode_chunk(buf, table)?); + } + Ok(chunks) +} + +fn decode_chunk( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 chunk map len read failure".to_owned()))?; + let mut chunk = TraceChunk::::default(); + let mut saw_trace_id = false; + let mut saw_spans = false; + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 chunk key (u8) read failure".to_owned()) + })?; + match key { + chunk_key::TRACE_ID => { + let len = decode::read_bin_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 chunk trace_id bin len read failure".to_owned()) + })?; + if len != 16 { + return Err(DecodeError::InvalidFormat(format!( + "V1 chunk trace_id must be 16 bytes, got {len}" + ))); + } + let bytes = buf.try_slice_and_advance(16).ok_or_else(|| { + DecodeError::InvalidFormat("V1 chunk trace_id payload truncated".to_owned()) + })?; + let slice: &[u8] = bytes.borrow(); + chunk.trace_id.copy_from_slice(slice); + saw_trace_id = true; + } + chunk_key::SPANS => { + let count = decode::read_array_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 chunk spans array len read failure".to_owned()) + })?; + let mut spans = Vec::with_capacity(count as usize); + for _ in 0..count { + spans.push(span::decode_span(buf, table)?); + } + chunk.spans = spans; + saw_spans = true; + } + chunk_key::ORIGIN => chunk.origin = read_interned_string(buf, table)?, + chunk_key::PRIORITY => { + let v: i64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 chunk priority read failure".to_owned()) + })?; + chunk.priority = Some(v as i32); + } + chunk_key::SAMPLING_MECHANISM => { + let v: u64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat( + "V1 chunk sampling_mechanism read failure".to_owned(), + ) + })?; + chunk.sampling_mechanism = Some(v as u32); + } + chunk_key::ATTRIBUTES => { + chunk.attributes = span::read_attributes_map(buf, table)?; + } + chunk_key::DROPPED_TRACE => { + chunk.dropped_trace = decode::read_bool(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat( + "V1 chunk dropped_trace bool read failure".to_owned(), + ) + })?; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 chunk key: {unknown}" + ))); + } + } + } + + if !saw_trace_id { + return Err(DecodeError::InvalidFormat( + "V1 chunk is missing trace_id".to_owned(), + )); + } + if !saw_spans { + return Err(DecodeError::InvalidFormat( + "V1 chunk is missing spans array".to_owned(), + )); + } + + Ok(chunk) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::msgpack_encoder::v1::to_vec_from_payload_v1; + use crate::span::v1::{ + AttributeValue, Span as V1Span, SpanBytes as V1SpanBytes, SpanKind, TraceChunkBytes, + TracerPayloadBytes, + }; + use crate::span::vec_map::VecMap; + use libdd_tinybytes::{Bytes, BytesString}; + + fn bs(s: &str) -> BytesString { + BytesString::from_slice(s.as_bytes()).expect("test string must fit in BytesString") + } + + fn sample_payload() -> TracerPayloadBytes { + let mut attrs = VecMap::>::new(); + attrs.insert(bs("http.method"), AttributeValue::String(bs("GET"))); + attrs.insert(bs("http.status"), AttributeValue::Int(200)); + attrs.insert(bs("is_root"), AttributeValue::Bool(true)); + attrs.insert(bs("ratio"), AttributeValue::Float(0.75)); + attrs.insert( + bs("ids"), + AttributeValue::List(vec![AttributeValue::Int(1), AttributeValue::Int(2)]), + ); + + let span = V1Span { + service: bs("svc"), + name: bs("GET /users"), + resource: bs("/users"), + r#type: bs("web"), + span_id: 42, + parent_id: 7, + start: 1_700_000_000_000, + duration: 1_500, + error: true, + span_kind: SpanKind::Server, + env: bs("prod"), + version: bs("1.2.3"), + component: bs("net/http"), + attributes: attrs, + ..Default::default() + }; + + let mut chunk_attrs = VecMap::>::new(); + chunk_attrs.insert(bs("_dd.p.dm"), AttributeValue::String(bs("-1"))); + + let chunk = TraceChunkBytes { + trace_id: [1u8; 16], + priority: Some(1), + origin: bs("synthetic"), + sampling_mechanism: Some(2), + dropped_trace: false, + attributes: chunk_attrs, + spans: vec![span], + }; + + TracerPayloadBytes { + language_name: bs("rust"), + language_version: bs("1.87"), + tracer_version: bs("9.9.9"), + runtime_id: bs("abcd-1234"), + env: bs("prod"), + hostname: bs("host-1"), + app_version: bs("1.2.3"), + chunks: vec![chunk], + ..Default::default() + } + } + + #[test] + fn roundtrip_full_payload() { + let original = sample_payload(); + let bytes = to_vec_from_payload_v1(&original); + let payload_len = bytes.len(); + let (decoded, consumed) = + from_bytes(Bytes::from(bytes)).expect("decoder should succeed on encoder output"); + + assert_eq!(consumed, payload_len, "decoder should consume all bytes"); + + // Tracer-level metadata + assert_eq!(decoded.language_name.as_str(), "rust"); + assert_eq!(decoded.language_version.as_str(), "1.87"); + assert_eq!(decoded.tracer_version.as_str(), "9.9.9"); + assert_eq!(decoded.runtime_id.as_str(), "abcd-1234"); + assert_eq!(decoded.env.as_str(), "prod"); + assert_eq!(decoded.hostname.as_str(), "host-1"); + assert_eq!(decoded.app_version.as_str(), "1.2.3"); + + // Chunk + assert_eq!(decoded.chunks.len(), 1); + let chunk = &decoded.chunks[0]; + assert_eq!(chunk.trace_id, [1u8; 16]); + assert_eq!(chunk.priority, Some(1)); + assert_eq!(chunk.sampling_mechanism, Some(2)); + assert_eq!(chunk.origin.as_str(), "synthetic"); + assert_eq!(chunk.attributes.len(), 1); + + // Span + assert_eq!(chunk.spans.len(), 1); + let span = &chunk.spans[0]; + assert_eq!(span.service.as_str(), "svc"); + assert_eq!(span.name.as_str(), "GET /users"); + assert_eq!(span.resource.as_str(), "/users"); + assert_eq!(span.r#type.as_str(), "web"); + assert_eq!(span.span_id, 42); + assert_eq!(span.parent_id, 7); + assert_eq!(span.start, 1_700_000_000_000); + assert_eq!(span.duration, 1_500); + assert!(span.error); + assert_eq!(span.span_kind, SpanKind::Server); + assert_eq!(span.env.as_str(), "prod"); + assert_eq!(span.version.as_str(), "1.2.3"); + assert_eq!(span.component.as_str(), "net/http"); + assert_eq!(span.attributes.len(), 5); + } + + #[test] + fn empty_payload_roundtrip() { + let original = TracerPayloadBytes::default(); + let bytes = to_vec_from_payload_v1(&original); + let (decoded, _) = + from_bytes(Bytes::from(bytes)).expect("decoder should succeed on empty payload"); + assert!(decoded.chunks.is_empty()); + assert!(decoded.language_name.as_str().is_empty()); + } + + #[test] + fn missing_chunks_field_is_rejected() { + // Manually encode a payload map with only one entry (env), no chunks field. + // `0x81` = fixmap len 1, key 0x07 (ENV_REF), value = inline str "x" (`0xa1 0x78`). + let bytes = vec![0x81, 0x07, 0xa1, 0x78]; + let err = from_bytes(Bytes::from(bytes)).expect_err("missing chunks must error"); + assert!(matches!(err, DecodeError::InvalidFormat(_))); + } + + #[test] + fn string_interning_resolves_across_chunks() { + // Two chunks sharing the same service name. The decoded service strings must both + // be "shared" — verifying the streaming string table is preserved across chunks. + let span_a = V1Span { + service: bs("shared"), + name: bs("a"), + span_id: 1, + start: 1, + ..Default::default() + }; + let span_b = V1Span { + service: bs("shared"), + name: bs("b"), + span_id: 2, + start: 1, + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![ + TraceChunkBytes { + trace_id: [1u8; 16], + spans: vec![span_a], + ..Default::default() + }, + TraceChunkBytes { + trace_id: [2u8; 16], + spans: vec![span_b], + ..Default::default() + }, + ], + ..Default::default() + }; + let bytes = to_vec_from_payload_v1(&payload); + let (decoded, _) = + from_bytes(Bytes::from(bytes)).expect("decoder should resolve interned strings"); + assert_eq!(decoded.chunks[0].spans[0].service.as_str(), "shared"); + assert_eq!(decoded.chunks[1].spans[0].service.as_str(), "shared"); + } + + #[test] + fn nested_keyvalue_attribute_roundtrip() { + let mut inner = VecMap::>::new(); + inner.insert(bs("k"), AttributeValue::String(bs("v"))); + let mut attrs = VecMap::>::new(); + attrs.insert(bs("nested"), AttributeValue::KeyValue(inner)); + + let span = V1Span { + service: bs("svc"), + name: bs("op"), + span_id: 1, + start: 1, + attributes: attrs, + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + spans: vec![span], + ..Default::default() + }], + ..Default::default() + }; + let bytes = to_vec_from_payload_v1(&payload); + let (decoded, _) = from_bytes(Bytes::from(bytes)).expect("nested KeyValue roundtrip"); + + let decoded_attrs = &decoded.chunks[0].spans[0].attributes; + match decoded_attrs.get(&bs("nested")) { + Some(AttributeValue::KeyValue(map)) => { + assert_eq!(map.len(), 1); + match map.get(&bs("k")) { + Some(AttributeValue::String(v)) => assert_eq!(v.as_str(), "v"), + _ => panic!("inner value should be String"), + } + } + _ => panic!("attribute should decode as KeyValue"), + } + } +} diff --git a/libdd-trace-utils/src/msgpack_decoder/v1/span.rs b/libdd-trace-utils/src/msgpack_decoder/v1/span.rs new file mode 100644 index 0000000000..a7e8817e6a --- /dev/null +++ b/libdd-trace-utils/src/msgpack_decoder/v1/span.rs @@ -0,0 +1,332 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +// Naming convention: parent module (`v1`) = input wire format being decoded. This file decodes +// a V1 msgpack span into a [`crate::span::v1::Span`]. + +use super::{ + read_interned_string, span_event_key, span_key, span_link_key, StringTable, + ANY_VALUE_KEY_ARRAY, ANY_VALUE_KEY_BOOL, ANY_VALUE_KEY_BYTES, ANY_VALUE_KEY_DOUBLE, + ANY_VALUE_KEY_INT64, ANY_VALUE_KEY_KEY_VALUE_LIST, ANY_VALUE_KEY_STRING, FLAT_ATTR_STRIDE, + TYPED_VALUE_STRIDE, +}; +use crate::msgpack_decoder::decode::buffer::Buffer; +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::v1::{AttributeValue, Span, SpanEvent, SpanKind, SpanLink, ThinVec}; +use crate::span::vec_map::VecMap; +use crate::span::DeserializableTraceData; +use rmp::decode; +use std::borrow::Borrow; + +/// Decodes a V1 span (msgpack map with integer keys) into a [`Span`]. +/// +/// The streaming `StringTable` is shared across the whole payload, so interned references in +/// this span can resolve to strings that appeared in an earlier chunk or payload header. +pub(super) fn decode_span( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("Unable to read V1 span map len".to_owned()))?; + + let mut span = Span::::default(); + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span key (u8) read failure".to_owned()))?; + + match key { + span_key::SERVICE => span.service = read_interned_string(buf, table)?, + span_key::NAME => span.name = read_interned_string(buf, table)?, + span_key::RESOURCE => span.resource = read_interned_string(buf, table)?, + span_key::SPAN_ID => { + span.span_id = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_id u64 read failure".to_owned()) + })? + } + span_key::PARENT_ID => { + span.parent_id = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 parent_id u64 read failure".to_owned()) + })? + } + span_key::START => { + span.start = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span start u64 read failure".to_owned()) + })? as i64; + } + span_key::DURATION => { + span.duration = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span duration u64 read failure".to_owned()) + })? as i64; + } + span_key::ERROR => { + span.error = decode::read_bool(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span error bool read failure".to_owned()) + })?; + } + span_key::TYPE => span.r#type = read_interned_string(buf, table)?, + span_key::ATTRIBUTES => span.attributes = read_attributes_map(buf, table)?, + span_key::SPAN_LINKS => span.span_links = read_span_links(buf, table)?, + span_key::SPAN_EVENTS => span.span_events = read_span_events(buf, table)?, + span_key::ENV => span.env = read_interned_string(buf, table)?, + span_key::VERSION => span.version = read_interned_string(buf, table)?, + span_key::COMPONENT => span.component = read_interned_string(buf, table)?, + span_key::KIND => { + let kind: u32 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_kind u32 read failure".to_owned()) + })?; + // OTEL spec: unset / unknown → Internal. + span.span_kind = match kind { + 2 => SpanKind::Server, + 3 => SpanKind::Client, + 4 => SpanKind::Producer, + 5 => SpanKind::Consumer, + _ => SpanKind::Internal, + }; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 span key: {unknown}" + ))); + } + } + } + + span.attributes.mark_deduped(); + + Ok(span) +} + +/// Reads a V1 attributes map encoded as a flat array of `[key, type_uint8, value, ...]` triplets. +pub(super) fn read_attributes_map( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let flat_len = decode::read_array_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attributes flat array len read failure".to_owned()) + })?; + + if flat_len % FLAT_ATTR_STRIDE != 0 { + return Err(DecodeError::InvalidFormat(format!( + "V1 attributes flat array length {flat_len} is not a multiple of {FLAT_ATTR_STRIDE}" + ))); + } + + let entries = (flat_len / FLAT_ATTR_STRIDE) as usize; + let mut map = VecMap::with_capacity(entries); + + for _ in 0..entries { + let key = read_interned_string(buf, table)?; + let value = read_typed_attribute_value(buf, table)?; + map.insert(key, value); + } + + map.mark_deduped(); + + Ok(map) +} + +/// Reads `[type_uint8, value]` and dispatches by type discriminant. Recurses into `Array` and +/// `KeyValueList`. +pub(super) fn read_typed_attribute_value( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let ty = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute type discriminant read failure".to_owned()) + })?; + + match ty { + ANY_VALUE_KEY_STRING => Ok(AttributeValue::String(read_interned_string(buf, table)?)), + ANY_VALUE_KEY_BOOL => { + let b = decode::read_bool(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Bool read failure".to_owned()) + })?; + Ok(AttributeValue::Bool(b)) + } + ANY_VALUE_KEY_DOUBLE => { + let f = decode::read_f64(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Double read failure".to_owned()) + })?; + Ok(AttributeValue::Float(f)) + } + ANY_VALUE_KEY_INT64 => { + // Encoder writes signed via `write_sint`, which can emit any int marker. `read_int` + // accepts any integer marker that fits in i64. + let i: i64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Int64 read failure".to_owned()) + })?; + Ok(AttributeValue::Int(i)) + } + ANY_VALUE_KEY_BYTES => Ok(AttributeValue::Bytes(read_bin(buf)?)), + ANY_VALUE_KEY_ARRAY => { + let array_len_with_stride = + decode::read_array_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 attribute Array len read failure".to_owned()) + })?; + // Array is a flat sequence of [type, value] pairs. + if array_len_with_stride % TYPED_VALUE_STRIDE != 0 { + return Err(DecodeError::InvalidFormat(format!( + "V1 attribute Array length {array_len_with_stride} is not a multiple of {TYPED_VALUE_STRIDE}" + ))); + } + let n = (array_len_with_stride / TYPED_VALUE_STRIDE) as usize; + let mut items = Vec::with_capacity(n); + for _ in 0..n { + items.push(read_typed_attribute_value(buf, table)?); + } + Ok(AttributeValue::List(items)) + } + ANY_VALUE_KEY_KEY_VALUE_LIST => Ok(AttributeValue::KeyValue(read_attributes_map( + buf, table, + )?)), + unknown => Err(DecodeError::InvalidFormat(format!( + "Unknown V1 AnyValue type discriminant: {unknown}" + ))), + } +} + +/// Reads a msgpack `bin` and slices the matching range out of the buffer. +fn read_bin(buf: &mut Buffer) -> Result { + let len = decode::read_bin_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 bin len read failure".to_owned()))?; + buf.try_slice_and_advance(len as usize) + .ok_or_else(|| DecodeError::InvalidFormat("V1 bin payload truncated".to_owned())) +} + +/// Reads the span_links array. The `SpanLinks` map key has already been consumed by the caller. +pub(super) fn read_span_links( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let count = decode::read_array_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_links len read failure".to_owned()))?; + let mut links = ThinVec::with_capacity(count as usize); + for _ in 0..count { + links.push(decode_span_link(buf, table)?); + } + Ok(links) +} + +fn decode_span_link( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_link map len read failure".to_owned()) + })?; + let mut link = SpanLink::::default(); + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_link key (u8) read failure".to_owned()) + })?; + match key { + span_link_key::TRACE_ID => { + let bytes = read_bin(buf)?; + let slice: &[u8] = bytes.borrow(); + if slice.len() != 16 { + return Err(DecodeError::InvalidFormat(format!( + "V1 span_link trace_id expected 16 bytes, got {}", + slice.len() + ))); + } + let mut arr = [0u8; 16]; + arr.copy_from_slice(slice); + link.trace_id = arr; + } + span_link_key::SPAN_ID => { + link.span_id = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_link span_id read failure".to_owned()) + })?; + } + span_link_key::ATTRIBUTES => { + link.attributes = read_attributes_map(buf, table)?; + } + span_link_key::TRACE_STATE => { + link.tracestate = read_interned_string(buf, table)?; + } + span_link_key::FLAGS => { + let v: u64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_link flags read failure".to_owned()) + })?; + link.flags = v as u32; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 span_link key: {unknown}" + ))); + } + } + } + Ok(link) +} + +pub(super) fn read_span_events( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result>, DecodeError> +where + T::Text: Clone, +{ + let count = decode::read_array_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_events len read failure".to_owned()))?; + let mut events = ThinVec::with_capacity(count as usize); + for _ in 0..count { + events.push(decode_span_event(buf, table)?); + } + Ok(events) +} + +fn decode_span_event( + buf: &mut Buffer, + table: &mut StringTable, +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let map_len = decode::read_map_len(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_event map len read failure".to_owned()) + })?; + let mut event = SpanEvent::::default(); + + for _ in 0..map_len { + let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_event key (u8) read failure".to_owned()) + })?; + match key { + span_event_key::TIME => { + event.time_unix_nano = decode::read_int(buf.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat("V1 span_event time read failure".to_owned()) + })?; + } + span_event_key::NAME => { + event.name = read_interned_string(buf, table)?; + } + span_event_key::ATTRIBUTES => { + event.attributes = read_attributes_map(buf, table)?; + } + unknown => { + return Err(DecodeError::InvalidFormat(format!( + "Unknown V1 span_event key: {unknown}" + ))); + } + } + } + Ok(event) +} diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index 5a219aa665..cb6e7fe2f0 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -394,29 +394,6 @@ impl SendData { endpoint.as_ref(), )); } - // TracerPayloadCollection::V1(payload) => { - // // V0.4-shaped spans re-encoded as a V1 msgpack payload at send time. Used by - // // the sidecar when the upstream SDK only speaks v0.4 but the agent advertises - // // `/v1.0/traces`. `extract_payload_attrs` will pull env/hostname/app_version - // // from span meta tags since the SDK propagates them there for v0.4 payloads. - // #[allow(clippy::unwrap_used)] - // let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); - // let mut headers = self.headers.clone(); - // headers.reserve(2); - // headers.insert(DATADOG_TRACE_COUNT, chunks.into()); - // headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK); - - // let metadata = crate::tracer_metadata::TracerMetadata::default(); - // let payload = msgpack_encoder::v1::to_vec_from_payload_v1(payload); - - // futures.push(self.send_payload( - // capabilities, - // chunks, - // payload, - // headers, - // endpoint.as_ref(), - // )); - // } TracerPayloadCollection::V05(payload) => { #[allow(clippy::unwrap_used)] let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); @@ -430,6 +407,24 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; + futures.push(self.send_payload( + capabilities, + chunks, + payload, + headers, + endpoint.as_ref(), + )); + } + TracerPayloadCollection::V1(payload) => { + #[allow(clippy::unwrap_used)] + let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); + let mut headers = self.headers.clone(); + headers.reserve(2); + headers.insert(DATADOG_TRACE_COUNT, chunks.into()); + headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK); + + let payload = msgpack_encoder::v1::to_vec_from_payload_v1(payload); + futures.push(self.send_payload( capabilities, chunks, @@ -560,10 +555,9 @@ mod tests { msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize } TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(), - // TracerPayloadCollection::V1(payloads) => { - // let metadata = crate::tracer_metadata::TracerMetadata::default(); - // msgpack_encoder::v1::to_encoded_byte_len(payloads, &metadata) as usize - // } + TracerPayloadCollection::V1(payload) => { + msgpack_encoder::v1::to_encoded_byte_len_from_payload_v1(payload) as usize + } } } diff --git a/libdd-trace-utils/src/tracer_payload.rs b/libdd-trace-utils/src/tracer_payload.rs index 2bd419ef10..d9f10a28ea 100644 --- a/libdd-trace-utils/src/tracer_payload.rs +++ b/libdd-trace-utils/src/tracer_payload.rs @@ -2,13 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::span::v05::dict::SharedDict; -use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData}; +use crate::span::{v04, v05, v1, BytesData, SharedDictBytes, TraceData}; use crate::trace_utils::convert_trace_chunks_v04_to_v05; use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; use anyhow::Ok; use libdd_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; +use tracing::warn; pub type TracerPayloadV04 = Vec; pub type TracerPayloadV05 = Vec; @@ -20,8 +21,7 @@ pub enum TraceEncoding { V04, /// v0.5 encoding (TracerPayloadV05). V05, - /// V1 encoding. Input is decoded as v0.4 (same span shape) and re-encoded as V1 msgpack - /// when sent to the agent. + /// v1 encoding (TracerPayloadV1). V1, } @@ -32,8 +32,7 @@ pub enum TraceChunks { /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDict, Vec>)), /// Collection of v0.4 spans to be serialized as a V1 msgpack payload. - V1(Vec>>), - // V1(Vec>>), + V1(v1::TracerPayload), } impl TraceChunks { @@ -41,9 +40,7 @@ impl TraceChunks { match self { TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces), - // V1 uses the same underlying span structure as V04. - TraceChunks::V1(traces) => TracerPayloadCollection::V04(traces), - // TraceChunks::V1(traces) => TracerPayloadCollection::V1(traces), + TraceChunks::V1(traces) => TracerPayloadCollection::V1(traces), } } } @@ -54,7 +51,7 @@ impl TraceChunks { match self { TraceChunks::V04(traces) => traces.len(), TraceChunks::V05((_, traces)) => traces.len(), - TraceChunks::V1(traces) => traces.len(), + TraceChunks::V1(trace) => trace.chunks.len(), } } } @@ -69,7 +66,7 @@ pub enum TracerPayloadCollection { /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDictBytes, Vec>)), // /// V0.4-shaped spans that must be serialized as a V1 msgpack payload on send. - // V1(Vec>), + V1(v1::TracerPayload), } impl TracerPayloadCollection { @@ -100,11 +97,17 @@ impl TracerPayloadCollection { dest.append(src) } } - // TracerPayloadCollection::V1(dest) => { - // if let TracerPayloadCollection::V1(src) = other { - // dest.append(src) - // } - // } + TracerPayloadCollection::V1(dest) => { + if let TracerPayloadCollection::V1(src) = other { + // Same-target SendData entries are coalesced by trace_utils::coalesce_send_data, + // so both V1 payloads typically share tracer-level metadata. If all metadata + // fields match we append `src`'s chunks into `dest`; if any diverge we no-op + // (logging a warning) rather than silently dropping `src`'s metadata. + if metadata_matches_v1(dest, src) { + dest.chunks.append(&mut src.chunks); + } + } + } // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190). #[allow(clippy::unimplemented)] TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"), @@ -157,7 +160,7 @@ impl TracerPayloadCollection { } TracerPayloadCollection::V04(collection) => collection.len(), TracerPayloadCollection::V05((_, collection)) => collection.len(), - // TracerPayloadCollection::V1(collection) => collection.len(), + TracerPayloadCollection::V1(collection) => collection.chunks.len(), } } } @@ -243,7 +246,7 @@ pub fn decode_to_trace_chunks( encoding_type: TraceEncoding, ) -> Result<(TraceChunks, usize), anyhow::Error> { match encoding_type { - TraceEncoding::V04 | TraceEncoding::V1 => { + TraceEncoding::V04 => { let (data, size) = msgpack_decoder::v04::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; Ok((TraceChunks::V04(data), size)) } @@ -251,7 +254,45 @@ pub fn decode_to_trace_chunks( let (data, size) = msgpack_decoder::v05::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; Ok((convert_trace_chunks_v04_to_v05(data)?, size)) } + TraceEncoding::V1 => { + let (data, size) = msgpack_decoder::v1::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; + Ok((TraceChunks::V1(data), size)) + } + } +} + +/// Returns `true` iff every tracer-level metadata string field of `src` matches `dest`. +/// +/// V1 payloads carry tracer metadata (env, hostname, language, …) inside the payload itself, so +/// merging two payloads whose metadata diverges would silently drop one set of values. Callers +/// use this to gate the merge: on a `false` return, append is skipped (no-op) and the two +/// payloads stay separate. A warning is logged listing the diverging fields so the situation +/// is observable rather than silent. +fn metadata_matches_v1( + dest: &v1::TracerPayload, + src: &v1::TracerPayload, +) -> bool { + let differing: Vec<&'static str> = [ + ("language_name", dest.language_name == src.language_name), + ("language_version", dest.language_version == src.language_version), + ("tracer_version", dest.tracer_version == src.tracer_version), + ("runtime_id", dest.runtime_id == src.runtime_id), + ("env", dest.env == src.env), + ("hostname", dest.hostname == src.hostname), + ("app_version", dest.app_version == src.app_version), + ] + .into_iter() + .filter_map(|(label, eq)| (!eq).then_some(label)) + .collect(); + + if !differing.is_empty() { + warn!( + "Skipping V1 TracerPayload append: diverging metadata fields {:?}", + differing + ); + return false; } + true } #[cfg(test)] From 815b11e2c6d09a2fe2ce0819215331777bcb3462 Mon Sep 17 00:00:00 2001 From: Anais Raison Date: Mon, 29 Jun 2026 15:40:20 +0200 Subject: [PATCH 3/3] fix: test and more --- datadog-sidecar-ffi/src/lib.rs | 10 +- datadog-sidecar/src/service/blocking.rs | 7 +- .../src/service/sidecar_interface.rs | 14 +- datadog-sidecar/src/service/sidecar_server.rs | 6 +- .../src/trace_exporter/trace_serializer.rs | 56 ++++++-- .../src/msgpack_decoder/v1/mod.rs | 103 ++++++++++++--- .../src/msgpack_decoder/v1/span.rs | 16 +-- libdd-trace-utils/src/tracer_payload.rs | 22 ++- ...compare_v1_full_payload_snapshot_test.json | 60 +++++++++ libdd-trace-utils/tests/test_send_data.rs | 125 ++++++++++++++++++ 10 files changed, 362 insertions(+), 57 deletions(-) create mode 100644 libdd-trace-utils/tests/snapshots/compare_v1_full_payload_snapshot_test.json diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 566ef08d38..d495d1e04b 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -1109,8 +1109,9 @@ pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_bytes( MaybeError::None } -/// Sends a v0.4-encoded trace to the sidecar via shared memory; the sidecar will re-encode -/// it as a V1 msgpack payload before forwarding to the agent. +/// Sends a V1-encoded trace to the sidecar via shared memory. The sidecar decodes the V1 +/// `TracerPayload`, can inspect it, and re-encodes it as V1 msgpack on the way to the agent's +/// `/v1.0/traces` endpoint. #[no_mangle] #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn ddog_sidecar_send_trace_v1_shm( @@ -1133,8 +1134,9 @@ pub unsafe extern "C" fn ddog_sidecar_send_trace_v1_shm( MaybeError::None } -/// Sends a v0.4-encoded trace as bytes to the sidecar; the sidecar will re-encode it as a -/// V1 msgpack payload before forwarding to the agent. +/// Sends a V1-encoded trace as bytes to the sidecar. The sidecar decodes the V1 `TracerPayload`, +/// can inspect it, and re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces` +/// endpoint. #[no_mangle] #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn ddog_sidecar_send_trace_v1_bytes( diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index 9506e575bd..0692651e3c 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -294,7 +294,8 @@ pub fn send_trace_v04_shm( Ok(()) } -/// Sends a v0.4-encoded trace as bytes; the sidecar re-encodes it as V1 before forwarding. +/// Sends a V1-encoded trace as bytes. The sidecar decodes the V1 payload, can inspect it, and +/// re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces` endpoint. pub fn send_trace_v1_bytes( transport: &mut SidecarTransport, instance_id: &InstanceId, @@ -305,8 +306,8 @@ pub fn send_trace_v1_bytes( Ok(()) } -/// Sends a v0.4-encoded trace via shared memory; the sidecar re-encodes it as V1 before -/// forwarding. +/// Sends a V1-encoded trace via shared memory. The sidecar decodes the V1 payload, can inspect +/// it, and re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces` endpoint. pub fn send_trace_v1_shm( transport: &mut SidecarTransport, instance_id: &InstanceId, diff --git a/datadog-sidecar/src/service/sidecar_interface.rs b/datadog-sidecar/src/service/sidecar_interface.rs index 7e54d260d2..15dabadc70 100644 --- a/datadog-sidecar/src/service/sidecar_interface.rs +++ b/datadog-sidecar/src/service/sidecar_interface.rs @@ -129,9 +129,9 @@ pub trait SidecarInterface { headers: SerializedTracerHeaderTags, ); - /// Sends a v0.4-encoded trace via shared memory; the sidecar re-encodes it as a V1 - /// msgpack payload before forwarding to the agent. Use this when the upstream SDK only - /// speaks v0.4 but the agent advertises `/v1.0/traces`. + /// Sends a V1-encoded trace via shared memory. The sidecar decodes the V1 `TracerPayload`, + /// can inspect it, and re-encodes it as V1 msgpack on the way to the agent's + /// `/v1.0/traces` endpoint. Use this when the SDK speaks V1 natively. /// /// # Arguments /// @@ -146,14 +146,14 @@ pub trait SidecarInterface { headers: SerializedTracerHeaderTags, ); - /// Sends a v0.4-encoded trace as bytes; the sidecar re-encodes it as a V1 msgpack payload - /// before forwarding to the agent. Use this when the upstream SDK only speaks v0.4 but - /// the agent advertises `/v1.0/traces`. + /// Sends a V1-encoded trace as bytes. The sidecar decodes the V1 `TracerPayload`, can + /// inspect it, and re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces` + /// endpoint. Use this when the SDK speaks V1 natively. /// /// # Arguments /// /// * `instance_id` - The ID of the instance. - /// * `data` - The v0.4 trace data serialized as bytes. + /// * `data` - The V1 trace data serialized as bytes. /// * `headers` - The serialized headers from the tracer. async fn send_trace_v1_bytes( instance_id: InstanceId, diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 5cdac56cc3..26f788422e 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -253,9 +253,9 @@ impl SidecarServer { self.send_trace(headers, data, target, retry_interval, TraceEncoding::V04) } - /// Re-encode entry point for the V1 path. Input bytes are still v0.4 msgpack from the SDK; - /// the [`TraceEncoding::V1`] tag tells [`SendData`] to encode the wire payload as V1 before - /// forwarding to the agent. + /// Entry point for the V1 trace path. Input bytes are a V1 msgpack `TracerPayload` from the + /// SDK; the [`TraceEncoding::V1`] tag drives [`decode_to_trace_chunks`] to the V1 decoder, + /// and [`SendData`] then re-encodes the same shape as V1 on the wire to the agent. fn send_trace_v1( &self, headers: &SerializedTracerHeaderTags, diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 8ee6246a69..83d24dcc5a 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -59,7 +59,7 @@ impl TraceSerializer { let chunks = payload.size(); let headers = self.build_traces_headers(header_tags, chunks, agent_payload_response_version); - let mp_payload = self.serialize_payload(&payload, metadata)?; + let mp_payload = self.serialize_payload(&payload, metadata, output_format)?; Ok(PreparedTracesPayload { data: mp_payload, @@ -78,8 +78,17 @@ impl TraceSerializer { TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) }; match output_format { - TraceExporterOutputFormat::V1 => todo!("Implement V1 trace collection"), - TraceExporterOutputFormat::V04 => Ok(tracer_payload::TraceChunks::V04(traces)), + // v0.4 input spans are kept as-is in `TraceChunks::V04`. Whether they go out as v0.4 + // or are cross-encoded into V1 on the wire is decided in `serialize_payload`. + // + // APMSP-2812 - TODO: when the data-pipeline gains a V1-native input model (its own + // `v1::Span`-shaped builder), route `OutputFormat::V1` to + // `TraceChunks::V1(v1::TracerPayload)` instead and serialize via + // `to_vec_from_payload_v1`. A `StatSpan` impl on `v1::Span` will also be needed + // if client-side stats are enabled on the V1-native path. + TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V1 => { + Ok(tracer_payload::TraceChunks::V04(traces)) + } TraceExporterOutputFormat::V05 => { trace_utils::convert_trace_chunks_v04_to_v05(traces).map_err(map_err) } @@ -111,23 +120,42 @@ impl TraceSerializer { &self, payload: &tracer_payload::TraceChunks, metadata: &TracerMetadata, + output_format: TraceExporterOutputFormat, ) -> Result, TraceExporterError> { let capacity = self .previous_serialised_len .load(Ordering::Relaxed) .max(MIN_BUFFER_CAPACITY); - let buff = match payload { - tracer_payload::TraceChunks::V04(p) => { + let buff = match (payload, output_format) { + (tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V04) => { msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32) } - tracer_payload::TraceChunks::V05(p) => { + // v0.4 spans cross-encoded as V1 on the wire — used when the agent advertises + // /v1.0/traces. Same in-memory shape as the v0.4 native path, different encoder. + (tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V1) => { + msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata) + } + (tracer_payload::TraceChunks::V05(p), TraceExporterOutputFormat::V05) => { let mut buff = Vec::with_capacity(capacity); rmp_serde::encode::write(&mut buff, p) .map_err(TraceExporterError::Serialization)?; buff } - tracer_payload::TraceChunks::V1(_) => { - todo!("Implement V1 payload serialization with metadata : {:?}", metadata) + // APMSP-2812 - TODO: native V1 input — call `msgpack_encoder::v1::to_vec_from_payload_v1` + // on the carried `v1::TracerPayload`. Not yet reachable: `collect_and_process_traces` + // never produces `TraceChunks::V1` in the current data-pipeline path. + (tracer_payload::TraceChunks::V1(_), TraceExporterOutputFormat::V1) => { + todo!("Native V1 input serialization not yet implemented (APMSP-2812)") + } + // `collect_and_process_traces` only produces (V04, V04|V1), (V05, V05), + // or (V1, V1) — any other combination here is a programming error. + _ => { + return Err(TraceExporterError::Deserialization( + DecodeError::InvalidFormat( + "Unsupported (TraceChunks, OutputFormat) combination for serialization" + .to_owned(), + ), + )); } }; self.previous_serialised_len @@ -275,7 +303,11 @@ mod tests { .collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V04) .unwrap(); - let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); + let result = serializer.serialize_payload( + &payload, + &TracerMetadata::default(), + TraceExporterOutputFormat::V04, + ); assert!(result.is_ok()); let serialized = result.unwrap(); @@ -310,7 +342,11 @@ mod tests { .collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V05) .unwrap(); - let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); + let result = serializer.serialize_payload( + &payload, + &TracerMetadata::default(), + TraceExporterOutputFormat::V05, + ); assert!(result.is_ok()); let serialized = result.unwrap(); diff --git a/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs index 0dd3046aeb..a91568bd08 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v1/mod.rs @@ -138,7 +138,9 @@ where { let slice: &[u8] = buf.as_mut_slice(); let marker_byte = *slice.first().ok_or_else(|| { - DecodeError::InvalidFormat("Unexpected end of V1 buffer when reading interned string".to_owned()) + DecodeError::InvalidFormat( + "Unexpected end of V1 buffer when reading interned string".to_owned(), + ) })?; // msgpack markers: @@ -154,9 +156,7 @@ where Ok(table.record(s)) } else if is_uint { let id: u64 = decode::read_int(buf.as_mut_slice()).map_err(|_| { - DecodeError::InvalidFormat( - "V1 interned string reference uint read failure".to_owned(), - ) + DecodeError::InvalidFormat("V1 interned string reference uint read failure".to_owned()) })?; table.resolve(id) } else { @@ -170,8 +170,8 @@ where /// /// # Returns /// -/// * `Ok((payload, payload_size))` — the decoded payload and the number of bytes consumed from -/// the buffer. +/// * `Ok((payload, payload_size))` — the decoded payload and the number of bytes consumed from the +/// buffer. /// * `Err(DecodeError)` — if the payload is malformed. /// /// # Errors @@ -213,9 +213,8 @@ fn decode_payload( where T::Text: Clone, { - let map_len = decode::read_map_len(buf.as_mut_slice()).map_err(|_| { - DecodeError::InvalidFormat("Unable to read V1 payload map len".to_owned()) - })?; + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("Unable to read V1 payload map len".to_owned()))?; let mut payload = TracerPayload::::default(); let mut saw_chunks = false; @@ -233,9 +232,7 @@ where trace_key::LANGUAGE_VERSION => { payload.language_version = read_interned_string(buf, table)? } - trace_key::TRACER_VERSION => { - payload.tracer_version = read_interned_string(buf, table)? - } + trace_key::TRACER_VERSION => payload.tracer_version = read_interned_string(buf, table)?, trace_key::RUNTIME_ID => payload.runtime_id = read_interned_string(buf, table)?, trace_key::ENV_REF => payload.env = read_interned_string(buf, table)?, trace_key::HOSTNAME_REF => payload.hostname = read_interned_string(buf, table)?, @@ -290,9 +287,8 @@ where let mut saw_spans = false; for _ in 0..map_len { - let key = decode::read_int::(buf.as_mut_slice()).map_err(|_| { - DecodeError::InvalidFormat("V1 chunk key (u8) read failure".to_owned()) - })?; + let key = decode::read_int::(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 chunk key (u8) read failure".to_owned()))?; match key { chunk_key::TRACE_ID => { let len = decode::read_bin_len(buf.as_mut_slice()).map_err(|_| { @@ -377,6 +373,7 @@ mod tests { TracerPayloadBytes, }; use crate::span::vec_map::VecMap; + use bolero::check; use libdd_tinybytes::{Bytes, BytesString}; fn bs(s: &str) -> BytesString { @@ -582,4 +579,80 @@ mod tests { _ => panic!("attribute should decode as KeyValue"), } } + + /// Fuzz test: bolero generates random strings + numbers for the V1 payload, the encoder + /// serialises it, and the decoder must accept its own output (no panic, no error). Mirrors + /// the v04 `fuzz_from_bytes` pattern. Bolero caps tuples at 12 fields — extra metadata is + /// either omitted or filled with deterministic defaults. + #[test] + #[cfg_attr(miri, ignore)] + fn fuzz_from_bytes() { + check!() + .with_type::<( + String, // language_name + String, // env (payload-level) + String, // service + String, // name + String, // resource + String, // span env + String, // attr_key + String, // attr_value + u64, // span_id + u64, // parent_id + u64, // start + bool, // error + )>() + .cloned() + .for_each( + |( + lang, + payload_env, + service, + name, + resource, + span_env, + attr_key, + attr_value, + span_id, + parent_id, + start, + error, + )| { + let bs = |s: &str| BytesString::from_slice(s.as_ref()).unwrap(); + let mut attrs = VecMap::>::new(); + attrs.insert(bs(&attr_key), AttributeValue::String(bs(&attr_value))); + + let span = V1SpanBytes { + service: bs(&service), + name: bs(&name), + resource: bs(&resource), + span_id, + parent_id, + start: start as i64, + error, + env: bs(&span_env), + attributes: attrs, + ..Default::default() + }; + + let payload = TracerPayloadBytes { + language_name: bs(&lang), + env: bs(&payload_env), + chunks: vec![TraceChunkBytes { + trace_id: [0xab; 16], + spans: vec![span], + ..Default::default() + }], + ..Default::default() + }; + + let encoded = to_vec_from_payload_v1(&payload); + let result = from_bytes(Bytes::from(encoded)); + assert!( + result.is_ok(), + "decoder rejected its own encoded output: {result:?}" + ); + }, + ); + } } diff --git a/libdd-trace-utils/src/msgpack_decoder/v1/span.rs b/libdd-trace-utils/src/msgpack_decoder/v1/span.rs index a7e8817e6a..16822f959e 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v1/span.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v1/span.rs @@ -186,9 +186,9 @@ where } Ok(AttributeValue::List(items)) } - ANY_VALUE_KEY_KEY_VALUE_LIST => Ok(AttributeValue::KeyValue(read_attributes_map( - buf, table, - )?)), + ANY_VALUE_KEY_KEY_VALUE_LIST => { + Ok(AttributeValue::KeyValue(read_attributes_map(buf, table)?)) + } unknown => Err(DecodeError::InvalidFormat(format!( "Unknown V1 AnyValue type discriminant: {unknown}" ))), @@ -227,9 +227,8 @@ fn decode_span_link( where T::Text: Clone, { - let map_len = decode::read_map_len(buf.as_mut_slice()).map_err(|_| { - DecodeError::InvalidFormat("V1 span_link map len read failure".to_owned()) - })?; + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_link map len read failure".to_owned()))?; let mut link = SpanLink::::default(); for _ in 0..map_len { @@ -300,9 +299,8 @@ fn decode_span_event( where T::Text: Clone, { - let map_len = decode::read_map_len(buf.as_mut_slice()).map_err(|_| { - DecodeError::InvalidFormat("V1 span_event map len read failure".to_owned()) - })?; + let map_len = decode::read_map_len(buf.as_mut_slice()) + .map_err(|_| DecodeError::InvalidFormat("V1 span_event map len read failure".to_owned()))?; let mut event = SpanEvent::::default(); for _ in 0..map_len { diff --git a/libdd-trace-utils/src/tracer_payload.rs b/libdd-trace-utils/src/tracer_payload.rs index d9f10a28ea..b6cedbc70a 100644 --- a/libdd-trace-utils/src/tracer_payload.rs +++ b/libdd-trace-utils/src/tracer_payload.rs @@ -99,8 +99,9 @@ impl TracerPayloadCollection { } TracerPayloadCollection::V1(dest) => { if let TracerPayloadCollection::V1(src) = other { - // Same-target SendData entries are coalesced by trace_utils::coalesce_send_data, - // so both V1 payloads typically share tracer-level metadata. If all metadata + // Same-target SendData entries are coalesced by + // trace_utils::coalesce_send_data, so both V1 payloads + // typically share tracer-level metadata. If all metadata // fields match we append `src`'s chunks into `dest`; if any diverge we no-op // (logging a warning) rather than silently dropping `src`'s metadata. if metadata_matches_v1(dest, src) { @@ -247,15 +248,21 @@ pub fn decode_to_trace_chunks( ) -> Result<(TraceChunks, usize), anyhow::Error> { match encoding_type { TraceEncoding::V04 => { - let (data, size) = msgpack_decoder::v04::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; + let (data, size) = msgpack_decoder::v04::from_bytes(data).map_err(|e| { + anyhow::format_err!("Error deserializing trace from request body: {e}") + })?; Ok((TraceChunks::V04(data), size)) } TraceEncoding::V05 => { - let (data, size) = msgpack_decoder::v05::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; + let (data, size) = msgpack_decoder::v05::from_bytes(data).map_err(|e| { + anyhow::format_err!("Error deserializing trace from request body: {e}") + })?; Ok((convert_trace_chunks_v04_to_v05(data)?, size)) } TraceEncoding::V1 => { - let (data, size) = msgpack_decoder::v1::from_bytes(data).map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?; + let (data, size) = msgpack_decoder::v1::from_bytes(data).map_err(|e| { + anyhow::format_err!("Error deserializing trace from request body: {e}") + })?; Ok((TraceChunks::V1(data), size)) } } @@ -274,7 +281,10 @@ fn metadata_matches_v1( ) -> bool { let differing: Vec<&'static str> = [ ("language_name", dest.language_name == src.language_name), - ("language_version", dest.language_version == src.language_version), + ( + "language_version", + dest.language_version == src.language_version, + ), ("tracer_version", dest.tracer_version == src.tracer_version), ("runtime_id", dest.runtime_id == src.runtime_id), ("env", dest.env == src.env), diff --git a/libdd-trace-utils/tests/snapshots/compare_v1_full_payload_snapshot_test.json b/libdd-trace-utils/tests/snapshots/compare_v1_full_payload_snapshot_test.json new file mode 100644 index 0000000000..265c35f962 --- /dev/null +++ b/libdd-trace-utils/tests/snapshots/compare_v1_full_payload_snapshot_test.json @@ -0,0 +1,60 @@ +[[ + { + "name": "test_send_data_v1_full_root", + "service": "test-service", + "resource": "/api/users", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "web", + "meta": { + "_dd.origin": "synthetics", + "_dd.p.dm": "-4", + "_dd.p.tid": "0123456789abcdef", + "component": "http", + "env": "test-env", + "http.method": "POST", + "span.kind": "server", + "version": "1.2.3" + }, + "metrics": { + "_sampling_priority_v1": 1, + "http.duration_ms": 42.75, + "http.status_code": 201, + "http.success": 1 + }, + "span_links": [ + { + "trace_id": 18364758544493064720, + "trace_id_high": 81985529216486895, + "span_id": 11574427654092267680, + "attributes": { + "link.reason": "retry" + }, + "tracestate": "dd=t.tid:abc", + "flags": 1 + } + ], + "span_events": [ + { + "time_unix_nano": 1727211691770715042, + "name": "exception", + "attributes": { + "exception.code": { + "type": 2, + "int_value": 500 + }, + "exception.handled": { + "type": 1, + "bool_value": false + }, + "exception.type": { + "type": 0, + "string_value": "RuntimeError" + } + } + } + ], + "duration": 5000, + "start": 1000000 + }]] diff --git a/libdd-trace-utils/tests/test_send_data.rs b/libdd-trace-utils/tests/test_send_data.rs index 39a10b58ff..a3f3070ff2 100644 --- a/libdd-trace-utils/tests/test_send_data.rs +++ b/libdd-trace-utils/tests/test_send_data.rs @@ -627,4 +627,129 @@ mod tracing_integration_tests { // 2 decoded spans. The checked-in snapshot is the canonical equivalent decoded form. test_agent.assert_snapshot(snapshot_name).await; } + + /// Builds a V1 payload exercising the full attribute surface that the current test-agent + /// (v1.56.0) is able to decode: `AttributeValue::String/Bool/Int/Float`, span links with + /// non-empty attributes, span events with non-empty attributes, plus chunk-level attributes. + /// + /// APMSP-3479 - TODO: `AttributeValue::Bytes`, `AttributeValue::List` and + /// `AttributeValue::KeyValue` are deliberately omitted — `ddapm-test-agent` v1.56.0 rejects + /// Bytes with `400 "Bytes values are not supported yet."` and does not handle List / KeyValue. + /// Add them here once test-agent V1 support catches up. + fn make_v1_full_payload(name_prefix: &str) -> libdd_trace_utils::span::v1::TracerPayloadBytes { + use libdd_trace_utils::span::v1::{ + AttributeValue, AttributeValueBytes, SpanBytes as V1SpanBytes, SpanEventBytes, + SpanKind, SpanLinkBytes, TraceChunkBytes, TracerPayloadBytes, + }; + + // Root span attributes — primitives supported by the test-agent today. + let mut root_attrs: VecMap = VecMap::new(); + root_attrs.insert(bs_v1("http.method"), AttributeValue::String(bs_v1("POST"))); + root_attrs.insert(bs_v1("http.status_code"), AttributeValue::Int(201)); + root_attrs.insert(bs_v1("http.success"), AttributeValue::Bool(true)); + root_attrs.insert(bs_v1("http.duration_ms"), AttributeValue::Float(42.75)); + + // Span link with non-empty attributes (String-only — primitives are enough to exercise + // the encoder/decoder paths for link attribute maps). + let mut link_attrs: VecMap = VecMap::new(); + link_attrs.insert(bs_v1("link.reason"), AttributeValue::String(bs_v1("retry"))); + let span_link = SpanLinkBytes { + trace_id: tid_bytes(0x0123_4567_89ab_cdef, 0xfedc_ba98_7654_3210), + span_id: 0xa0a0_a0a0_a0a0_a0a0, + tracestate: bs_v1("dd=t.tid:abc"), + flags: 1, + attributes: link_attrs, + }; + + // Span event with attributes (String + Int + Bool to exercise typed values). + let mut event_attrs: VecMap = VecMap::new(); + event_attrs.insert( + bs_v1("exception.type"), + AttributeValue::String(bs_v1("RuntimeError")), + ); + event_attrs.insert(bs_v1("exception.code"), AttributeValue::Int(500)); + event_attrs.insert(bs_v1("exception.handled"), AttributeValue::Bool(false)); + let span_event = SpanEventBytes { + time_unix_nano: 1_727_211_691_770_715_042, + name: bs_v1("exception"), + attributes: event_attrs, + }; + + let root_span = V1SpanBytes { + service: bs_v1("test-service"), + name: bs_v1(&format!("{name_prefix}_root")), + resource: bs_v1("/api/users"), + r#type: bs_v1("web"), + span_id: 1, + parent_id: 0, + start: 1_000_000, + duration: 5_000, + span_kind: SpanKind::Server, + env: bs_v1("test-env"), + version: bs_v1("1.2.3"), + component: bs_v1("http"), + attributes: root_attrs, + span_links: thin_vec::thin_vec![span_link], + span_events: thin_vec::thin_vec![span_event], + ..Default::default() + }; + + let mut chunk_attrs = VecMap::new(); + chunk_attrs.insert(bs_v1("_dd.p.dm"), AttributeValue::String(bs_v1("-4"))); + chunk_attrs.insert( + bs_v1("_dd.p.tid"), + AttributeValue::String(bs_v1("0123456789abcdef")), + ); + + let chunk = TraceChunkBytes { + trace_id: tid_bytes(0, 0xfeedface), + priority: Some(1), + origin: bs_v1("synthetics"), + sampling_mechanism: Some(4), + attributes: chunk_attrs, + dropped_trace: false, + spans: vec![root_span], + }; + + TracerPayloadBytes { + language_name: bs_v1("test-lang"), + language_version: bs_v1("2.0"), + tracer_version: bs_v1("1.0"), + runtime_id: bs_v1("test-runtime-id-full"), + env: bs_v1("test-env"), + hostname: bs_v1("test-host"), + app_version: bs_v1("1.2.3"), + attributes: VecMap::new(), + chunks: vec![chunk], + } + } + + /// Big V1 integration test: encodes a payload covering every primitive attribute variant + /// the test-agent supports today (String/Bool/Int/Float) plus non-empty span_link and + /// span_event attribute maps, then POSTs to `/v1.0/traces` and asserts the snapshot. + /// + /// APMSP-3479 - TODO: extend this with `AttributeValue::Bytes`, `AttributeValue::List` + /// and `AttributeValue::KeyValue` once `ddapm-test-agent` v1 support catches up. See + /// [`make_v1_full_payload`] for the payload builder and the omitted variants. + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn compare_v1_full_payload_snapshot_test() { + use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_payload_v1; + + let relative_snapshot_path = "libdd-trace-utils/tests/snapshots/"; + let snapshot_name = "compare_v1_full_payload_snapshot_test"; + let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await; + let uri = test_agent + .get_uri_for_endpoint("v1.0/traces", Some(snapshot_name)) + .await; + + test_agent.start_session(snapshot_name, None).await; + + let payload = make_v1_full_payload("test_send_data_v1_full"); + let encoded = to_vec_from_payload_v1(&payload); + + post_v1_payload(uri, encoded).await; + + test_agent.assert_snapshot(snapshot_name).await; + } }