diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 2fafabc97a..346dd63292 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -1731,7 +1731,7 @@ pub unsafe extern "C" fn ddog_send_traces_to_sidecar( // Write traces to the shared memory let mut shm_slice = mapped_shm.as_slice_mut(); let shm_slice_len = shm_slice.len(); - let written = match msgpack_encoder::v04::write_to_slice(&mut shm_slice, traces) { + let written = match msgpack_encoder::v04::write_to_slice_from_v04(&mut shm_slice, traces) { Ok(()) => shm_slice_len - shm_slice.len(), Err(_) => { tracing::error!("Failed serializing the traces"); @@ -1761,7 +1761,7 @@ pub unsafe extern "C" fn ddog_send_traces_to_sidecar( match blocking::send_trace_v04_bytes( &mut parameters.transport, ¶meters.instance_id, - msgpack_encoder::v04::to_vec_with_capacity(traces, written as u32), + msgpack_encoder::v04::to_vec_with_capacity_from_v04(traces, written as u32), check!( (¶meters.tracer_headers_tags).try_into(), "Failed to convert tracer headers tags" diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 9c2970a475..154b0df208 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -795,6 +795,7 @@ impl TraceExporterBuilder { app_version: self.app_version, runtime_id, service: self.service, + container_id: String::new(), }, input_format: self.input_format, output_format: self.output_format, diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 52e42dcd67..39ae528337 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -1417,7 +1417,7 @@ mod tests { span_id: 2, ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let resp = exporter.send(data.as_ref()).unwrap(); assert!(matches!(resp, AgentResponse::Unchanged)); @@ -1458,7 +1458,7 @@ mod tests { name: BytesString::from_slice(b"test").unwrap(), ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); // `send` is synchronous and, in log mode, returns after writing through the // capability without initiating any HTTP; combined with the structural assert // above this is deterministic (no background worker can race the mock). @@ -1499,7 +1499,7 @@ mod tests { ..Default::default() }], ]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let _result = exporter.send(data.as_ref()).expect("failed to send trace"); @@ -1599,7 +1599,7 @@ mod tests { name: BytesString::from_slice(b"test").unwrap(), ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let result = exporter.send(data.as_ref()); assert!(result.is_err()); @@ -1707,7 +1707,7 @@ mod tests { name: BytesString::from_slice(b"test").unwrap(), ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let result = exporter.send(data.as_ref()); assert!(result.is_err()); @@ -1811,7 +1811,7 @@ mod tests { name: BytesString::from_slice(b"test").unwrap(), ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let _result = exporter.send(data.as_ref()).expect("failed to send trace"); @@ -1871,7 +1871,7 @@ mod tests { name: BytesString::from_slice(b"test").unwrap(), ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let result = exporter.send(data.as_ref()).unwrap(); assert_eq!( @@ -1913,7 +1913,7 @@ mod tests { name: BytesString::from_slice(b"test").unwrap(), ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let code = match exporter.send(data.as_ref()).unwrap_err() { TraceExporterError::Request(e) => Some(e.status()), _ => None, @@ -1948,7 +1948,7 @@ mod tests { name: BytesString::from_slice(b"test").unwrap(), ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let err = exporter.send(data.as_ref()); assert!(err.is_err()); @@ -2293,7 +2293,7 @@ mod tests { ..Default::default() }]; - let data = msgpack_encoder::v04::to_vec(&[trace_chunk]); + let data = msgpack_encoder::v04::to_vec_from_v04(&[trace_chunk]); // Wait for the info fetcher to get the config while mock_info.calls() == 0 { @@ -2361,7 +2361,7 @@ mod tests { error: 0, ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let result = exporter.send(data.as_ref()); assert!( @@ -2413,7 +2413,7 @@ mod tests { error: 0, ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); let result = exporter.send(data.as_ref()); assert!( @@ -2470,7 +2470,7 @@ mod tests { duration: 1, ..Default::default() }]]; - let data = msgpack_encoder::v04::to_vec(&traces); + let data = msgpack_encoder::v04::to_vec_from_v04(&traces); exporter.send(data.as_ref()).unwrap(); mock_intake.assert(); } @@ -2541,7 +2541,7 @@ mod single_threaded_tests { ..Default::default() }]; - let data = msgpack_encoder::v04::to_vec(&[trace_chunk]); + let data = msgpack_encoder::v04::to_vec_from_v04(&[trace_chunk]); // Wait for the info fetcher to get the config while agent_info::get_agent_info().is_none() { @@ -2642,7 +2642,7 @@ mod single_threaded_tests { ..Default::default() }]; - let data = msgpack_encoder::v04::to_vec(&[trace_chunk]); + let data = msgpack_encoder::v04::to_vec_from_v04(&[trace_chunk]); // Wait for agent_info to be present so that sending a trace will trigger the stats worker // to start @@ -2741,7 +2741,7 @@ mod single_threaded_tests { duration: 10, ..Default::default() }]; - let data = msgpack_encoder::v04::to_vec(&[trace_chunk]); + let data = msgpack_encoder::v04::to_vec_from_v04(&[trace_chunk]); let _ = exporter.send(data.as_ref()); let start = std::time::Instant::now(); @@ -2849,7 +2849,7 @@ mod single_threaded_tests { duration: 10, ..Default::default() }]; - let data = msgpack_encoder::v04::to_vec(&[trace_chunk]); + let data = msgpack_encoder::v04::to_vec_from_v04(&[trace_chunk]); // 1st send: /info has promoted v1_active=true, so this hits /v1.0/traces and 404s. let result1 = exporter.send(&data); diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 7cfd62a5a0..98a0c980f2 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -120,7 +120,7 @@ impl TraceSerializer { .max(MIN_BUFFER_CAPACITY); let buff = match payload { tracer_payload::TraceChunks::V04(p) => { - msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32) + msgpack_encoder::v04::to_vec_with_capacity_from_v04(p, capacity as u32) } tracer_payload::TraceChunks::V05(p) => { let mut buff = Vec::with_capacity(capacity); @@ -129,7 +129,7 @@ impl TraceSerializer { buff } tracer_payload::TraceChunks::V1(p) => { - msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata) + msgpack_encoder::v1::to_vec_with_capacity_from_v04(p, capacity as u32, metadata) } }; self.previous_serialised_len diff --git a/libdd-trace-utils/benches/serialization.rs b/libdd-trace-utils/benches/serialization.rs index 3179b3fa73..8d41e18b70 100644 --- a/libdd-trace-utils/benches/serialization.rs +++ b/libdd-trace-utils/benches/serialization.rs @@ -65,7 +65,7 @@ pub fn serialize_internal_to_msgpack(c: &mut Criterion) { b.iter_batched( || vec![0u8; 12_000_000], |mut vec| { - let _ = black_box(msgpack_encoder::v04::write_to_slice( + let _ = black_box(msgpack_encoder::v04::write_to_slice_from_v04( &mut vec.as_mut_slice(), black_box(&data), )); diff --git a/libdd-trace-utils/src/msgpack_encoder/mod.rs b/libdd-trace-utils/src/msgpack_encoder/mod.rs index 2d54a03349..2370de9071 100644 --- a/libdd-trace-utils/src/msgpack_encoder/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/mod.rs @@ -1,6 +1,32 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +//! # Encoder layout & naming convention +//! +//! ```text +//! msgpack_encoder/ +//! ├── v04/ +//! │ ├── mod.rs // public API + payload-level helpers +//! │ ├── span_v04.rs // v0.4 in-memory Span → v0.4 wire (native) +//! │ └── span_v1.rs // v1 in-memory Span → v0.4 wire (downgrade) +//! └── v1/ +//! ├── mod.rs +//! ├── span_v04.rs // v0.4 in-memory Span → V1 wire (upgrade) +//! └── span_v1.rs // v1 in-memory Span → V1 wire (native) +//! ``` +//! +//! - **Module (`v04`/`v1`) = output wire format.** +//! - **File suffix (`_v04`/`_v1`) = input span type.** +//! - **Public functions carry a `_from_` suffix**, so a caller reads the *output* from the +//! module path and the *input* from the function name: +//! +//! | Module | Function | Input → Output | +//! |--------|----------|----------------| +//! | `v04::` | `to_vec_from_v04`, `write_to_slice_from_v04`, `to_encoded_byte_len_from_v04` | v04 → v0.4 (native) | +//! | `v04::` | `to_vec_from_v1`, `write_to_slice_from_v1`, `to_encoded_byte_len_from_v1` | v1 → v0.4 (downgrade) | +//! | `v1::` | `to_vec_from_v04`, `write_to_slice_from_v04`, `to_encoded_byte_len_from_v04` | v04 → V1 (upgrade) | +//! | `v1::` | `to_vec_from_v1`, `write_to_slice_from_v1`, `to_encoded_byte_len_from_v1` | v1 → V1 (native) | + pub mod v04; pub mod v1; diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs index 8d89567f0a..652f008e99 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs @@ -2,11 +2,70 @@ // SPDX-License-Identifier: Apache-2.0 use crate::span::v04::Span; +use crate::span::v1::TracerPayload; use crate::span::TraceData; use libdd_common::ResultInfallibleExt; use rmp::encode::{write_array_len, ByteBuf, RmpWrite, ValueWriteError}; -mod span; +const fn msgpack_string_encoding_len(s: &str) -> usize { + let length_marker_len = if s.len() < 32 { + 1 + } else if s.len() < 256 { + 2 + } else if s.len() <= (u16::MAX as usize) { + 3 + } else { + 5 + }; + length_marker_len + s.len() +} + +// Compute the encoding of a string to msgpack in a const manner +const fn msgpack_const_string_encoding(s: &str) -> [u8; ENCODING_LEN] { + // copy_to_slice is not const yet, so we make a helper + const fn copy_to_slice(dest: &mut [u8], src: &[u8], n: usize) { + let mut i = 0; + while i < n { + dest[i] = src[i]; + i += 1; + } + } + + let mut storage = [0; ENCODING_LEN]; + let len = s.len() as u64; + let len_bytes = if len < 32 { + storage[0] = 0xa0 | (len as u8 & 0x1f); + 0 + } else if len < 256 { + storage[0] = 0xd9; + 1 + } else if len <= (u16::MAX as u64) { + storage[0] = 0xda; + 2 + } else { + storage[0] = 0xdb; + 4 + }; + copy_to_slice(storage.split_at_mut(1).1, &len.to_be_bytes(), len_bytes); + copy_to_slice(storage.split_at_mut(1 + len_bytes).1, s.as_bytes(), s.len()); + storage +} + +macro_rules! write_const_msgpack_str { + ($writer:expr, $str:expr) => {{ + use rmp::encode::ValueWriteError; + const STRING_ENCODING_LEN: usize = super::msgpack_string_encoding_len($str); + const STRING_ENCODING: [u8; STRING_ENCODING_LEN] = + super::msgpack_const_string_encoding($str); + + $writer + .write_bytes(&STRING_ENCODING) + .map_err(ValueWriteError::InvalidDataWrite) + }}; +} + +mod span_v04; +mod span_v1; #[inline(always)] fn to_writer]>>( @@ -17,7 +76,7 @@ fn to_writer]>>( for trace in traces { write_array_len(writer, trace.as_ref().len() as u32)?; for span in trace.as_ref() { - span::encode_span(writer, span)?; + span_v04::encode_span(writer, span)?; } } @@ -45,7 +104,7 @@ fn to_writer]>>( /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v04::write_to_slice; +/// use libdd_trace_utils::msgpack_encoder::v04::write_to_slice_from_v04; /// use libdd_trace_utils::span::v04::SpanSlice; /// /// let mut buffer = vec![0u8; 1024]; @@ -55,9 +114,9 @@ fn to_writer]>>( /// }; /// let traces = vec![vec![span]]; /// -/// write_to_slice(&mut &mut buffer[..], &traces).expect("Encoding failed"); +/// write_to_slice_from_v04(&mut &mut buffer[..], &traces).expect("Encoding failed"); /// ``` -pub fn write_to_slice]>>( +pub fn write_to_slice_from_v04]>>( slice: &mut &mut [u8], traces: &[S], ) -> Result<(), ValueWriteError> { @@ -77,7 +136,7 @@ pub fn write_to_slice]>>( /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v04::to_vec; +/// use libdd_trace_utils::msgpack_encoder::v04::to_vec_from_v04; /// use libdd_trace_utils::span::v04::SpanSlice; /// /// let span = SpanSlice { @@ -85,12 +144,12 @@ pub fn write_to_slice]>>( /// ..Default::default() /// }; /// let traces = vec![vec![span]]; -/// let encoded = to_vec(&traces); +/// let encoded = to_vec_from_v04(&traces); /// /// assert!(!encoded.is_empty()); /// ``` -pub fn to_vec]>>(traces: &[S]) -> Vec { - to_vec_with_capacity(traces, 0) +pub fn to_vec_from_v04]>>(traces: &[S]) -> Vec { + to_vec_with_capacity_from_v04(traces, 0) } /// Serializes traces into a vector of bytes with specified capacity. @@ -107,7 +166,7 @@ pub fn to_vec]>>(traces: &[S]) -> Vec { /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v04::to_vec_with_capacity; +/// use libdd_trace_utils::msgpack_encoder::v04::to_vec_with_capacity_from_v04; /// use libdd_trace_utils::span::v04::SpanSlice; /// /// let span = SpanSlice { @@ -115,11 +174,11 @@ pub fn to_vec]>>(traces: &[S]) -> Vec { /// ..Default::default() /// }; /// let traces = vec![vec![span]]; -/// let encoded = to_vec_with_capacity(&traces, 1024); +/// let encoded = to_vec_with_capacity_from_v04(&traces, 1024); /// /// assert!(encoded.capacity() >= 1024); /// ``` -pub fn to_vec_with_capacity]>>( +pub fn to_vec_with_capacity_from_v04]>>( traces: &[S], capacity: u32, ) -> Vec { @@ -146,7 +205,7 @@ pub fn to_vec_with_capacity]>>( /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v04::to_encoded_byte_len; +/// use libdd_trace_utils::msgpack_encoder::v04::to_encoded_byte_len_from_v04; /// use libdd_trace_utils::span::v04::SpanSlice; /// /// let span = SpanSlice { @@ -154,11 +213,11 @@ pub fn to_vec_with_capacity]>>( /// ..Default::default() /// }; /// let traces = vec![vec![span]]; -/// let encoded_len = to_encoded_byte_len(&traces); +/// let encoded_len = to_encoded_byte_len_from_v04(&traces); /// /// assert!(encoded_len > 0); /// ``` -pub fn to_encoded_byte_len]>>(traces: &[S]) -> u32 { +pub fn to_encoded_byte_len_from_v04]>>(traces: &[S]) -> u32 { let mut counter = super::CountLength(0); // `CountLength` impls `std::io::Write` (whose error type is `std::io::Error`, not // `Infallible`), so we can't statically prove infallibility via `unwrap_infallible` @@ -168,3 +227,90 @@ pub fn to_encoded_byte_len]>>(traces: &[S]) -> u let _ = to_writer(&mut counter, traces); counter.0 } + +/// Encodes a [`TracerPayload`] in the v0.4 wire format (downgrade path used when the agent +/// does not advertise `/v1.0/traces`). The output is a msgpack array of traces, where each +/// trace is itself a msgpack array of v0.4-shaped spans — matching the existing v0.4 wire +/// format produced by [`to_vec`]. Payload-level `env`/`app_version`/`attributes` are propagated +/// into every span (see [`span_v1`]'s mapping table); `payload.hostname` has no v0.4 body +/// equivalent (the agent gets hostname from the `Datadog-Meta-Hostname` header instead) and is +/// intentionally dropped here. +fn encode_payload_from_v1( + writer: &mut W, + payload: &TracerPayload, +) -> Result<(), ValueWriteError> { + use span_v1::{encode_span, ChunkContext}; + + write_array_len(writer, payload.chunks.len() as u32)?; + for chunk in &payload.chunks { + // v0.4 has no wire-level equivalent of `dropped_trace`; the closest historical signal + // is `USER_REJECT` (priority -1), which tells the agent the sampler rejected this trace + // without dropping the spans themselves. Only force it when the chunk doesn't already + // carry a negative (reject-like) priority. + let priority = if chunk.dropped_trace { + Some(chunk.priority.filter(|&p| p < 0).unwrap_or(-1)) + } else { + chunk.priority + }; + let ctx = ChunkContext { + trace_id: &chunk.trace_id, + priority, + origin: &chunk.origin, + sampling_mechanism: chunk.sampling_mechanism, + attributes: &chunk.attributes, + payload_env: &payload.env, + payload_app_version: &payload.app_version, + payload_attributes: &payload.attributes, + }; + write_array_len(writer, chunk.spans.len() as u32)?; + for span in &chunk.spans { + encode_span(writer, span, &ctx)?; + } + } + Ok(()) +} + +/// Serializes a [`TracerPayload`] (V1 data model) as a v0.4 msgpack payload. +/// +/// Used by the trace exporter when the agent has not advertised `/v1.0/traces` via `/info`. +/// The output is byte-compatible with [`to_vec`] for equivalent data — chunk-level fields are +/// propagated to every span and typed attributes are bucketed into the v0.4 `meta` / +/// `metrics` / `meta_struct` maps per [`span_v1_to_v04`]'s mapping table. +pub fn to_vec_from_v1(payload: &TracerPayload) -> Vec { + to_vec_with_capacity_from_v1(payload, 0) +} + +/// Serializes a [`TracerPayload`] as a v0.4 msgpack payload with a caller-supplied initial +/// capacity. Use this when you can size the buffer up front (e.g. from +/// [`to_encoded_byte_len_from_v1`]) to avoid reallocations. +pub fn to_vec_with_capacity_from_v1( + payload: &TracerPayload, + capacity: u32, +) -> Vec { + let mut buf = ByteBuf::with_capacity(capacity as usize); + encode_payload_from_v1(&mut buf, payload) + .map_err(super::flatten_value_write_infallible) + .unwrap_infallible(); + buf.into_vec() +} + +/// Encodes a [`TracerPayload`] as v0.4 msgpack into the provided slice. Useful for callers +/// that own a pre-sized buffer (e.g. for FFI / zero-copy paths). +/// +/// # Errors +/// +/// Returns any [`ValueWriteError`] from the underlying writer (typically buffer-too-small). +pub fn write_to_slice_from_v1( + slice: &mut &mut [u8], + payload: &TracerPayload, +) -> Result<(), ValueWriteError> { + encode_payload_from_v1(slice, payload) +} + +/// Returns the exact number of bytes [`to_vec_from_v1`] would write for `payload`. Walks +/// the payload through a counting writer without allocating an output buffer. +pub fn to_encoded_byte_len_from_v1(payload: &TracerPayload) -> u32 { + let mut counter = super::CountLength(0); + let _ = encode_payload_from_v1(&mut counter, payload); + counter.0 +} diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/span.rs b/libdd-trace-utils/src/msgpack_encoder/v04/span_v04.rs similarity index 67% rename from libdd-trace-utils/src/msgpack_encoder/v04/span.rs rename to libdd-trace-utils/src/msgpack_encoder/v04/span_v04.rs index b4e8f7301c..9d079f3649 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v04/span.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v04/span_v04.rs @@ -1,6 +1,9 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +//! Native v0.4 span encoder: `crate::span::v04::Span` → v0.4 msgpack wire. +//! (Convention documented in [`crate::msgpack_encoder`].) + use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; use crate::span::TraceData; use rmp::encode::{ @@ -9,63 +12,8 @@ use rmp::encode::{ }; use std::borrow::Borrow; -const fn msp_string_encoding_len(s: &str) -> usize { - let length_marker_len = if s.len() < 32 { - 1 - } else if s.len() < 256 { - 2 - } else if s.len() <= (u16::MAX as usize) { - 3 - } else { - 5 - }; - length_marker_len + s.len() -} - -// Compute the encoding of a string to messagepack in a const manner -const fn msp_const_string_encoding(s: &str) -> [u8; ENCODING_LEN] { - // copy_to_slice is not const yet, so we make a helper - const fn copy_to_slice(dest: &mut [u8], src: &[u8], n: usize) { - let mut i = 0; - while i < n { - dest[i] = src[i]; - i += 1; - } - } - - let mut storage = [0; ENCODING_LEN]; - let len = s.len() as u64; - let len_bytes = if len < 32 { - storage[0] = 0xa0 | (len as u8 & 0x1f); - 0 - } else if len < 256 { - storage[0] = 0xd9; - 1 - } else if len <= (u16::MAX as u64) { - storage[0] = 0xda; - 2 - } else { - storage[0] = 0xdb; - 4 - }; - copy_to_slice(storage.split_at_mut(1).1, &len.to_be_bytes(), len_bytes); - copy_to_slice(storage.split_at_mut(1 + len_bytes).1, s.as_bytes(), s.len()); - storage -} - -macro_rules! write_const_msg_pack_str { - ($writer:expr, $str:expr) => {{ - use rmp::encode::ValueWriteError; - const STRING_ENCODING_LEN: usize = msp_string_encoding_len($str); - const STRING_ENCODING: [u8; STRING_ENCODING_LEN] = msp_const_string_encoding($str); - - $writer - .write_bytes(&STRING_ENCODING) - .map_err(ValueWriteError::InvalidDataWrite) - }}; -} - -/// Encodes a `SpanLink` object into a slice of bytes. +/// Encodes a [`v04::SpanLink`](crate::span::v04::SpanLink) into the v0.4 msgpack wire format +/// (native encoding: input span shape and output wire format are both v0.4). /// /// # Arguments /// @@ -83,7 +31,7 @@ pub fn encode_span_links( writer: &mut W, span_links: &[SpanLink], ) -> Result<(), ValueWriteError> { - write_const_msg_pack_str!(writer, "span_links")?; + write_const_msgpack_str!(writer, "span_links")?; rmp::encode::write_array_len(writer, span_links.len() as u32)?; for link in span_links.iter() { @@ -94,17 +42,17 @@ pub fn encode_span_links( rmp::encode::write_map_len(writer, link_len)?; - write_const_msg_pack_str!(writer, "trace_id")?; + write_const_msgpack_str!(writer, "trace_id")?; write_u64(writer, link.trace_id)?; - write_const_msg_pack_str!(writer, "trace_id_high")?; + write_const_msgpack_str!(writer, "trace_id_high")?; write_u64(writer, link.trace_id_high)?; - write_const_msg_pack_str!(writer, "span_id")?; + write_const_msgpack_str!(writer, "span_id")?; write_u64(writer, link.span_id)?; if !link.attributes.is_empty() { - write_const_msg_pack_str!(writer, "attributes")?; + write_const_msgpack_str!(writer, "attributes")?; rmp::encode::write_map_len(writer, link.attributes.len() as u32)?; for (k, v) in link.attributes.iter() { write_str(writer, k.borrow())?; @@ -113,12 +61,12 @@ pub fn encode_span_links( } if !link.tracestate.borrow().is_empty() { - write_const_msg_pack_str!(writer, "tracestate")?; + write_const_msgpack_str!(writer, "tracestate")?; write_str(writer, link.tracestate.borrow())?; } if link.flags != 0 { - write_const_msg_pack_str!(writer, "flags")?; + write_const_msgpack_str!(writer, "flags")?; write_u32(writer, link.flags)?; } } @@ -126,7 +74,8 @@ pub fn encode_span_links( Ok(()) } -/// Encodes a `SpanEvent` object into a slice of bytes. +/// Encodes a [`v04::SpanEvent`](crate::span::v04::SpanEvent) into the v0.4 msgpack wire format +/// (native encoding: v0.4 input → v0.4 output). /// /// # Arguments /// @@ -144,7 +93,7 @@ pub fn encode_span_events( writer: &mut W, span_events: &[SpanEvent], ) -> Result<(), ValueWriteError> { - write_const_msg_pack_str!(writer, "span_events")?; + write_const_msgpack_str!(writer, "span_events")?; rmp::encode::write_array_len(writer, span_events.len() as u32)?; for event in span_events.iter() { let event_len = 2 /* minimal span event: time_unix_nano, name */ @@ -152,14 +101,14 @@ pub fn encode_span_events( rmp::encode::write_map_len(writer, event_len)?; - write_const_msg_pack_str!(writer, "time_unix_nano")?; + write_const_msgpack_str!(writer, "time_unix_nano")?; write_u64(writer, event.time_unix_nano)?; - write_const_msg_pack_str!(writer, "name")?; + write_const_msgpack_str!(writer, "name")?; write_str(writer, event.name.borrow())?; if !event.attributes.is_empty() { - write_const_msg_pack_str!(writer, "attributes")?; + write_const_msgpack_str!(writer, "attributes")?; rmp::encode::write_map_len(writer, event.attributes.len() as u32)?; for (k, attribute) in event.attributes.iter() { write_str(writer, k.borrow())?; @@ -170,26 +119,26 @@ pub fn encode_span_events( ) -> Result<(), ValueWriteError> { rmp::encode::write_map_len(writer, 2)?; - write_const_msg_pack_str!(writer, "type")?; + write_const_msgpack_str!(writer, "type")?; match value { AttributeArrayValue::String(s) => { write_u8(writer, 0)?; - write_const_msg_pack_str!(writer, "string_value")?; + write_const_msgpack_str!(writer, "string_value")?; write_str(writer, s.borrow())?; } AttributeArrayValue::Boolean(bool) => { write_u8(writer, 1)?; - write_const_msg_pack_str!(writer, "bool_value")?; + write_const_msgpack_str!(writer, "bool_value")?; write_bool(writer, *bool).map_err(ValueWriteError::InvalidDataWrite)?; } AttributeArrayValue::Integer(int) => { write_u8(writer, 2)?; - write_const_msg_pack_str!(writer, "int_value")?; + write_const_msgpack_str!(writer, "int_value")?; write_sint(writer, *int)?; } AttributeArrayValue::Double(double) => { write_u8(writer, 3)?; - write_const_msg_pack_str!(writer, "double_value")?; + write_const_msgpack_str!(writer, "double_value")?; write_f64(writer, *double)?; } }; @@ -204,13 +153,13 @@ pub fn encode_span_events( AttributeAnyValue::Array(array) => { rmp::encode::write_map_len(writer, 2)?; - write_const_msg_pack_str!(writer, "type")?; + write_const_msgpack_str!(writer, "type")?; write_u8(writer, 4)?; - write_const_msg_pack_str!(writer, "array_value")?; + write_const_msgpack_str!(writer, "array_value")?; rmp::encode::write_map_len(writer, 1)?; - write_const_msg_pack_str!(writer, "values")?; + write_const_msgpack_str!(writer, "values")?; rmp::encode::write_array_len(writer, array.len() as u32)?; for v in array.iter() { write_array_value(writer, v)?; @@ -224,7 +173,8 @@ pub fn encode_span_events( Ok(()) } -/// Encodes a `Span` object into a slice of bytes. +/// Encodes a [`v04::Span`](crate::span::v04::Span) into the v0.4 msgpack wire format +/// (native encoding: v0.4 input → v0.4 output). /// /// # Arguments /// @@ -255,39 +205,39 @@ pub fn encode_span( rmp::encode::write_map_len(writer, span_len)?; - write_const_msg_pack_str!(writer, "service")?; + write_const_msgpack_str!(writer, "service")?; write_str(writer, span.service.borrow())?; - write_const_msg_pack_str!(writer, "name")?; + write_const_msgpack_str!(writer, "name")?; write_str(writer, span.name.borrow())?; - write_const_msg_pack_str!(writer, "resource")?; + write_const_msgpack_str!(writer, "resource")?; write_str(writer, span.resource.borrow())?; - write_const_msg_pack_str!(writer, "trace_id")?; + write_const_msgpack_str!(writer, "trace_id")?; write_u64(writer, span.trace_id as u64)?; - write_const_msg_pack_str!(writer, "span_id")?; + write_const_msgpack_str!(writer, "span_id")?; write_u64(writer, span.span_id)?; if span.parent_id != 0 { - write_const_msg_pack_str!(writer, "parent_id")?; + write_const_msgpack_str!(writer, "parent_id")?; write_u64(writer, span.parent_id)?; } - write_const_msg_pack_str!(writer, "start")?; + write_const_msgpack_str!(writer, "start")?; write_i64(writer, span.start)?; - write_const_msg_pack_str!(writer, "duration")?; + write_const_msgpack_str!(writer, "duration")?; write_sint(writer, span.duration)?; if span.error != 0 { - write_const_msg_pack_str!(writer, "error")?; + write_const_msgpack_str!(writer, "error")?; write_sint(writer, span.error as i64)?; } if !span.meta.is_empty() { - write_const_msg_pack_str!(writer, "meta")?; + write_const_msgpack_str!(writer, "meta")?; let meta_dd = span.meta.defensive_dedup(); rmp::encode::write_map_len(writer, meta_dd.len() as u32)?; for (k, v) in meta_dd.iter() { @@ -297,7 +247,7 @@ pub fn encode_span( } if !span.metrics.is_empty() { - write_const_msg_pack_str!(writer, "metrics")?; + write_const_msgpack_str!(writer, "metrics")?; let metrics_dd = span.metrics.defensive_dedup(); rmp::encode::write_map_len(writer, metrics_dd.len() as u32)?; for (k, v) in metrics_dd.iter() { @@ -307,12 +257,12 @@ pub fn encode_span( } if !span.r#type.borrow().is_empty() { - write_const_msg_pack_str!(writer, "type")?; + write_const_msgpack_str!(writer, "type")?; write_str(writer, span.r#type.borrow())?; } if !span.meta_struct.is_empty() { - write_const_msg_pack_str!(writer, "meta_struct")?; + write_const_msgpack_str!(writer, "meta_struct")?; let meta_struct_dd = span.meta_struct.defensive_dedup(); rmp::encode::write_map_len(writer, meta_struct_dd.len() as u32)?; for (k, v) in meta_struct_dd.iter() { diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/span_v1.rs b/libdd-trace-utils/src/msgpack_encoder/v04/span_v1.rs new file mode 100644 index 0000000000..037bfa8f94 --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/v04/span_v1.rs @@ -0,0 +1,1304 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Downgrade encoder: `crate::span::v1::Span` → v0.4 msgpack wire. +//! (Convention documented in [`crate::msgpack_encoder`].) +//! +//! Used when the receiving agent does not advertise the `/v1.0/traces` endpoint and the tracer +//! must fall back to v0.4. The mapping is: +//! +//! | v1::Span field / attribute | v0.4 field | +//! |---------------------------------------|---------------------------------------------| +//! | `env` / `version` / `component` | `meta["env"]` / `meta["version"]` / ... (`env`/`version` fall back to the payload-level `env`/`app_version` when unset on the span) | +//! | `span_kind` | `meta["span.kind"]` (lowercase string) | +//! | `AttributeValue::String` / `Bool` | `meta[k]` (`"true"` / `"false"` for bool) | +//! | `AttributeValue::Float` / `Int` | `metrics[k]` (Int cast to `f64`) | +//! | `AttributeValue::Bytes` | `meta_struct[k]` (raw bytes) | +//! | `AttributeValue::List` | flattened into `meta`/`metrics[k.0]`, `[k.1]`, ... (per element type) | +//! | `AttributeValue::KeyValue` | flattened into `meta`/`metrics[k.a]`, `[k.a.b]`, ... (per member, recursively) | +//! | `error: bool` | `error: i32` (`true → 1`, `false → 0`) | +//! | Chunk `trace_id: [u8; 16]` | `trace_id: u64` (low 64) + `meta["_dd.p.tid"]` (hex of high 64, when non-zero) | +//! | Chunk `origin` | `meta["_dd.origin"]` | +//! | Chunk `priority` | `metrics["_sampling_priority_v1"]` | +//! | Chunk `sampling_mechanism` | `meta["_dd.p.dm"]` (`"-{mechanism}"`) | +//! | Chunk `attributes` | Applied to every span in the chunk | +//! | Payload `env` / `app_version` | Fallback for `meta["env"]` / `meta["version"]` when the span leaves them unset | +//! | Payload `attributes` | Applied to every span, lowest precedence (span > chunk > payload) | +//! | Chunk `dropped_trace: true` | Forces `metrics["_sampling_priority_v1"] = -1` (USER_REJECT) unless the chunk's own priority is already negative | +//! +//! An attribute sharing a name with one of the dedicated fields above (`env`, `version`, +//! `component`, `span.kind`, `_dd.p.tid`, `_dd.origin`, `_dd.p.dm`, `_sampling_priority_v1`) is +//! dropped: the dedicated field always wins, so each key is written at most once. + +use crate::span::v1::{AttributeValue, Span, SpanEvent, SpanKind, SpanLink}; +use crate::span::vec_map::VecMap; +use crate::span::TraceData; +use rmp::encode::{ + write_array_len, write_bin, write_bool, write_f64, write_i64, write_map_len, write_sint, + write_str, write_u32, write_u64, write_u8, RmpWrite, ValueWriteError, +}; +use std::borrow::Borrow; +use std::collections::HashSet; + +/// Writes a `bool` as the v0.4 string representation (`"true"` / `"false"`). Used wherever a +/// typed V1 `Bool` attribute is downgraded into v0.4 `meta` (which is `String → String` only). +fn write_bool_as_str( + writer: &mut W, + b: bool, +) -> Result<(), ValueWriteError> { + write_str(writer, if b { "true" } else { "false" }) +} + +/// Reserved v0.4 `meta`/`metrics` key names written from dedicated typed fields (`span.env`, +/// chunk `origin`, ...) rather than from the attribute maps. An attribute sharing one of these +/// names would otherwise collide with the dedicated field's entry on the wire; the dedicated +/// field always wins and the same-named attribute is dropped — see `encode_span`. +const PROMOTED_ATTR_KEYS: &[&str] = &[ + "env", + "version", + "component", + "span.kind", + "_dd.p.tid", + "_dd.origin", + "_dd.p.dm", + "_sampling_priority_v1", +]; + +/// Chunk-level context propagated into every span when downgrading to v0.4. Built once per +/// chunk by the top-level encoder and passed by reference to `encode_span_v1_to_v04`. Also +/// carries payload-level fields (`payload_env`, `payload_app_version`, `payload_attributes`), +/// which apply as a fallback when the span itself doesn't set the equivalent field — v0.4 has +/// neither a chunk nor a payload concept, so both levels collapse onto every span. +pub(super) struct ChunkContext<'a, T: TraceData> { + pub trace_id: &'a [u8; 16], + pub priority: Option, + pub origin: &'a T::Text, + pub sampling_mechanism: Option, + pub attributes: &'a VecMap>, + pub payload_env: &'a T::Text, + pub payload_app_version: &'a T::Text, + pub payload_attributes: &'a VecMap>, +} + +/// Maps a `SpanKind` to its v0.4 `span.kind` meta string. Returns `None` for `Internal` so +/// callers can skip emitting the default value. +fn span_kind_to_meta(kind: SpanKind) -> Option<&'static str> { + match kind { + SpanKind::Internal => None, + SpanKind::Server => Some("server"), + SpanKind::Client => Some("client"), + SpanKind::Producer => Some("producer"), + SpanKind::Consumer => Some("consumer"), + } +} + +/// Splits a 128-bit big-endian trace_id into big-endian `(low_64, high_64)`. The low half maps to +/// v0.4's `trace_id` field; the high half goes to `meta["_dd.p.tid"]` as hex when non-zero. +#[inline] +fn split_trace_id(trace_id: &[u8; 16]) -> (u64, u64) { + let mut high_bytes = [0u8; 8]; + let mut low_bytes = [0u8; 8]; + high_bytes.copy_from_slice(&trace_id[..8]); + low_bytes.copy_from_slice(&trace_id[8..]); + ( + u64::from_be_bytes(low_bytes), + u64::from_be_bytes(high_bytes), + ) +} + +/// Per-bucket counts for the v0.4 `meta`, `metrics`, and `meta_struct` maps. +#[derive(Default)] +struct BucketCounts { + meta: u32, + metrics: u32, + meta_struct: u32, +} + +/// Recursively flattens a `List`/`KeyValue` attribute into dotted-key leaf entries for the v0.4 +/// `meta` (string-valued) and `metrics` (numeric) maps — matching how intake/the UI expect nested +/// V1 attributes to be exploded: list elements become `key.0`, `key.1`, ... and `KeyValue` +/// members become `key.` (recursively, for nested `KeyValue`/`List` values). Scalars +/// (`String`/`Bool`/`Int`/`Float`) are leaves in their own right and produce a single entry under +/// `key`. `Bytes` has no flattened form; callers must route it to `meta_struct` separately. +fn flatten_attr_into( + key: String, + v: &AttributeValue, + meta_out: &mut Vec<(String, String)>, + metrics_out: &mut Vec<(String, f64)>, +) { + match v { + AttributeValue::String(s) => meta_out.push((key, s.borrow().to_owned())), + AttributeValue::Bool(b) => { + meta_out.push((key, if *b { "true" } else { "false" }.to_owned())) + } + AttributeValue::Int(i) => metrics_out.push((key, *i as f64)), + AttributeValue::Float(f) => metrics_out.push((key, *f)), + AttributeValue::Bytes(_) => { + // Callers filter `Bytes` out before recursing; unreachable in practice. + } + AttributeValue::List(items) => { + for (i, item) in items.iter().enumerate() { + flatten_attr_into(format!("{key}.{i}"), item, meta_out, metrics_out); + } + } + AttributeValue::KeyValue(map) => { + for (k, v) in map.defensive_dedup().iter() { + flatten_attr_into(format!("{key}.{}", k.borrow()), v, meta_out, metrics_out); + } + } + } +} + +/// Encodes a [`v1::Span`](crate::span::v1::Span) into the v0.4 msgpack wire format +/// (downgrade: v1 input → v0.4 output). Chunk-level context (`trace_id`, `origin`, `priority`, +/// `sampling_mechanism`, chunk attributes) is injected into the span's `meta` / `metrics` / +/// `meta_struct` maps since v0.4 has no chunk concept. +/// +/// # Arguments +/// +/// * `writer` - A RmpWriter compatible with rmp writing functions. +/// * `span` - The v1::Span to encode. +/// * `chunk` - Chunk-level context (`trace_id`, `origin`, `priority`, `sampling_mechanism`, chunk +/// attributes) propagated into the span on the v0.4 wire. +/// +/// # Returns +/// +/// * `Ok(())` - Nothing if successful. +/// * `Err(ValueWriteError)` - An error if the writing fails. +/// +/// # Errors +/// +/// This function will return any error emitted by the writer. +pub(super) fn encode_span( + writer: &mut W, + span: &Span, + chunk: &ChunkContext<'_, T>, +) -> Result<(), ValueWriteError> { + let span_attrs_dd = span.attributes.defensive_dedup(); + let chunk_attrs_dd = chunk.attributes.defensive_dedup(); + let payload_attrs_dd = chunk.payload_attributes.defensive_dedup(); + + // Merge span + chunk + payload attributes upfront with explicit "span overrides chunk + // overrides payload" precedence. We don't rely on msgpack map last-write-wins decoding + // here: the v0.4 / msgpack specs do not formalize behavior for duplicate map keys, so we + // emit each key exactly once. + // + // Attributes sharing a name with a "promoted" dedicated field (`env`, `_dd.origin`, ...) + // are dropped here: the dedicated field always wins so we never emit that key twice. + let span_keys: HashSet<&T::Text> = span_attrs_dd.iter().map(|(k, _)| k).collect(); + let mut seen_keys: HashSet<&T::Text> = span_keys.clone(); + let chunk_only: Vec<(&T::Text, &AttributeValue)> = chunk_attrs_dd + .iter() + .filter(|(k, _)| !seen_keys.contains(k)) + .collect(); + seen_keys.extend(chunk_attrs_dd.iter().map(|(k, _)| k)); + let payload_only: Vec<(&T::Text, &AttributeValue)> = payload_attrs_dd + .iter() + .filter(|(k, _)| !seen_keys.contains(k)) + .collect(); + let merged_attrs: Vec<(&T::Text, &AttributeValue)> = span_attrs_dd + .iter() + .chain(chunk_only) + .chain(payload_only) + .filter(|(k, _)| !PROMOTED_ATTR_KEYS.contains(&(*k).borrow())) + .collect(); + + let (trace_id_low, trace_id_high) = split_trace_id(chunk.trace_id); + let kind_meta = span_kind_to_meta(span.span_kind); + + // `env`/`version` fall back to the payload-level value when the span doesn't set its own — + // mirrors how a v1 tracer can set these once at the payload level instead of duplicating + // them on every span/chunk. + let env: &str = if !span.env.borrow().is_empty() { + span.env.borrow() + } else { + chunk.payload_env.borrow() + }; + let version: &str = if !span.version.borrow().is_empty() { + span.version.borrow() + } else { + chunk.payload_app_version.borrow() + }; + + // Flatten every attribute into `meta` (string-valued) / `metrics` (numeric) leaf entries. + // `List` and `KeyValue` have no v0.4 wire representation as a single value, so they are + // exploded into dotted keys (`key.0`, `key.a.b`) the same way intake/the UI expect nested V1 + // attributes — see the mapping table in the module docs. `Bytes` keeps going to + // `meta_struct` since it has no flattened form. + let mut meta_leaves: Vec<(String, String)> = Vec::new(); + let mut metrics_leaves: Vec<(String, f64)> = Vec::new(); + let mut bytes_attrs: Vec<(&T::Text, &T::Bytes)> = Vec::new(); + for &(k, v) in &merged_attrs { + match v { + AttributeValue::Bytes(b) => bytes_attrs.push((k, b)), + _ => flatten_attr_into( + k.borrow().to_owned(), + v, + &mut meta_leaves, + &mut metrics_leaves, + ), + } + } + + // First pass: count bucket sizes so each msgpack map header carries the exact length. + let mut counts = BucketCounts::default(); + counts.meta += !env.is_empty() as u32; + counts.meta += !version.is_empty() as u32; + counts.meta += !span.component.borrow().is_empty() as u32; + counts.meta += kind_meta.is_some() as u32; + counts.meta += (trace_id_high != 0) as u32; + counts.meta += !chunk.origin.borrow().is_empty() as u32; + counts.meta += chunk.sampling_mechanism.is_some() as u32; + counts.meta += meta_leaves.len() as u32; + counts.metrics += chunk.priority.is_some() as u32; + counts.metrics += metrics_leaves.len() as u32; + counts.meta_struct += bytes_attrs.len() as u32; + + let span_len = 7 // service, name, resource, trace_id, span_id, start, duration (always) + + (!span.r#type.borrow().is_empty()) as u32 + + (span.parent_id != 0) as u32 + + span.error as u32 + + (counts.meta > 0) as u32 + + (counts.metrics > 0) as u32 + + (counts.meta_struct > 0) as u32 + + (!span.span_links.is_empty()) as u32 + + (!span.span_events.is_empty()) as u32; + + write_map_len(writer, span_len)?; + + write_const_msgpack_str!(writer, "service")?; + write_str(writer, span.service.borrow())?; + + write_const_msgpack_str!(writer, "name")?; + write_str(writer, span.name.borrow())?; + + write_const_msgpack_str!(writer, "resource")?; + write_str(writer, span.resource.borrow())?; + + write_const_msgpack_str!(writer, "trace_id")?; + write_u64(writer, trace_id_low)?; + + write_const_msgpack_str!(writer, "span_id")?; + write_u64(writer, span.span_id)?; + + if span.parent_id != 0 { + write_const_msgpack_str!(writer, "parent_id")?; + write_u64(writer, span.parent_id)?; + } + + write_const_msgpack_str!(writer, "start")?; + write_i64(writer, span.start)?; + + write_const_msgpack_str!(writer, "duration")?; + write_sint(writer, span.duration)?; + + if span.error { + write_const_msgpack_str!(writer, "error")?; + write_sint(writer, 1)?; + } + + if counts.meta > 0 { + write_const_msgpack_str!(writer, "meta")?; + write_map_len(writer, counts.meta)?; + + if !env.is_empty() { + write_const_msgpack_str!(writer, "env")?; + write_str(writer, env)?; + } + if !version.is_empty() { + write_const_msgpack_str!(writer, "version")?; + write_str(writer, version)?; + } + if !span.component.borrow().is_empty() { + write_const_msgpack_str!(writer, "component")?; + write_str(writer, span.component.borrow())?; + } + if let Some(kind_str) = kind_meta { + write_const_msgpack_str!(writer, "span.kind")?; + write_str(writer, kind_str)?; + } + if trace_id_high != 0 { + // Lower-case hex without `0x` prefix — the agent expects this format. + write_const_msgpack_str!(writer, "_dd.p.tid")?; + write_str(writer, &format!("{trace_id_high:016x}"))?; + } + if !chunk.origin.borrow().is_empty() { + write_const_msgpack_str!(writer, "_dd.origin")?; + write_str(writer, chunk.origin.borrow())?; + } + if let Some(mechanism) = chunk.sampling_mechanism { + write_const_msgpack_str!(writer, "_dd.p.dm")?; + write_str(writer, &format!("-{mechanism}"))?; + } + for (k, v) in &meta_leaves { + write_str(writer, k)?; + write_str(writer, v)?; + } + } + + if counts.metrics > 0 { + write_const_msgpack_str!(writer, "metrics")?; + write_map_len(writer, counts.metrics)?; + + if let Some(priority) = chunk.priority { + write_const_msgpack_str!(writer, "_sampling_priority_v1")?; + write_f64(writer, priority as f64)?; + } + for (k, v) in &metrics_leaves { + write_str(writer, k)?; + write_f64(writer, *v)?; + } + } + + if !span.r#type.borrow().is_empty() { + write_const_msgpack_str!(writer, "type")?; + write_str(writer, span.r#type.borrow())?; + } + + if counts.meta_struct > 0 { + write_const_msgpack_str!(writer, "meta_struct")?; + write_map_len(writer, counts.meta_struct)?; + + for &(k, b) in &bytes_attrs { + write_str(writer, k.borrow())?; + write_bin(writer, b.borrow())?; + } + } + + if !span.span_links.is_empty() { + encode_span_links(writer, &span.span_links)?; + } + if !span.span_events.is_empty() { + encode_span_events(writer, &span.span_events)?; + } + + Ok(()) +} + +/// Encodes [`v1::SpanLink`](crate::span::v1::SpanLink)s into the v0.4 msgpack wire format +/// (downgrade: v1 input → v0.4 output). The 128-bit `trace_id` is split into +/// `(trace_id, trace_id_high)` u64s. Typed link attributes are downgraded to strings; +/// non-string-coercible variants are dropped because v0.4 link attributes are `String → String` +/// only. +fn encode_span_links( + writer: &mut W, + span_links: &[SpanLink], +) -> Result<(), ValueWriteError> { + write_const_msgpack_str!(writer, "span_links")?; + write_array_len(writer, span_links.len() as u32)?; + + for link in span_links { + let (trace_id_low, trace_id_high) = split_trace_id(&link.trace_id); + let attrs_dd = link.attributes.defensive_dedup(); + let attr_count = attrs_dd + .iter() + .filter(|(_, v)| matches!(v, AttributeValue::String(_) | AttributeValue::Bool(_))) + .count() as u32; + + let link_len = 3 // trace_id, trace_id_high, span_id (always) + + (attr_count > 0) as u32 + + (!link.tracestate.borrow().is_empty()) as u32 + + (link.flags != 0) as u32; + + write_map_len(writer, link_len)?; + + write_const_msgpack_str!(writer, "trace_id")?; + write_u64(writer, trace_id_low)?; + + write_const_msgpack_str!(writer, "trace_id_high")?; + write_u64(writer, trace_id_high)?; + + write_const_msgpack_str!(writer, "span_id")?; + write_u64(writer, link.span_id)?; + + if attr_count > 0 { + write_const_msgpack_str!(writer, "attributes")?; + write_map_len(writer, attr_count)?; + for (k, v) in attrs_dd.iter() { + match v { + AttributeValue::String(s) => { + write_str(writer, k.borrow())?; + write_str(writer, s.borrow())?; + } + AttributeValue::Bool(b) => { + write_str(writer, k.borrow())?; + write_bool_as_str(writer, *b)?; + } + _ => {} + } + } + } + + if !link.tracestate.borrow().is_empty() { + write_const_msgpack_str!(writer, "tracestate")?; + write_str(writer, link.tracestate.borrow())?; + } + + if link.flags != 0 { + write_const_msgpack_str!(writer, "flags")?; + write_u32(writer, link.flags)?; + } + } + + Ok(()) +} + +/// Encodes [`v1::SpanEvent`](crate::span::v1::SpanEvent)s into the v0.4 msgpack wire format +/// (downgrade: v1 input → v0.4 output). Typed attributes are downgraded to the v0.4 +/// `{"type": , "_value": ...}` shape — see `write_event_attr_value`. `Bytes` and +/// `KeyValue` have no v0.4 event-attribute equivalent and are dropped. +fn encode_span_events( + writer: &mut W, + span_events: &[SpanEvent], +) -> Result<(), ValueWriteError> { + write_const_msgpack_str!(writer, "span_events")?; + write_array_len(writer, span_events.len() as u32)?; + + for event in span_events { + let attrs_dd = event.attributes.defensive_dedup(); + let attr_count = attrs_dd + .iter() + .filter(|(_, v)| is_supported_event_attr(v)) + .count() as u32; + + let event_len = 2 // time_unix_nano, name (always) + + (attr_count > 0) as u32; + + write_map_len(writer, event_len)?; + + write_const_msgpack_str!(writer, "time_unix_nano")?; + write_u64(writer, event.time_unix_nano)?; + + write_const_msgpack_str!(writer, "name")?; + write_str(writer, event.name.borrow())?; + + if attr_count > 0 { + write_const_msgpack_str!(writer, "attributes")?; + write_map_len(writer, attr_count)?; + for (k, v) in attrs_dd.iter() { + if !is_supported_event_attr(v) { + continue; + } + write_str(writer, k.borrow())?; + write_event_attr_value(writer, v)?; + } + } + } + + Ok(()) +} + +/// Returns `true` when `v` can be downgraded to a v0.4 event-attribute (scalar or scalar list). +fn is_supported_event_attr(v: &AttributeValue) -> bool { + matches!( + v, + AttributeValue::String(_) + | AttributeValue::Bool(_) + | AttributeValue::Int(_) + | AttributeValue::Float(_) + | AttributeValue::List(_) + ) +} + +macro_rules! write_type { + ($writer:expr, $int_type:expr, $str_type:expr) => {{ + write_map_len($writer, 2)?; + write_const_msgpack_str!($writer, "type")?; + write_u8($writer, $int_type)?; + write_str($writer, $str_type)?; + }}; +} + +/// Writes a v0.4 event-attribute value as `{"type": , "..._value": ...}`. Scalars produce a +/// 2-entry map; `List` produces `{"type": 4, "array_value": {"values": [...]}}` with each +/// element written via `write_event_array_element`. +fn write_event_attr_value( + writer: &mut W, + v: &AttributeValue, +) -> Result<(), ValueWriteError> { + match v { + AttributeValue::String(s) => { + write_type!(writer, 0, "string_value"); + write_str(writer, s.borrow())?; + } + AttributeValue::Bool(b) => { + write_type!(writer, 1, "bool_value"); + write_bool(writer, *b).map_err(ValueWriteError::InvalidDataWrite)?; + } + AttributeValue::Int(i) => { + write_type!(writer, 2, "int_value"); + write_sint(writer, *i)?; + } + AttributeValue::Float(f) => { + write_type!(writer, 3, "double_value"); + write_f64(writer, *f)?; + } + AttributeValue::List(arr) => { + write_type!(writer, 4, "array_value"); + // Only scalar elements survive the downgrade; nested structural entries are + // skipped because v0.4 array elements must themselves be scalar. + let scalar_elems = arr.iter().filter(|e| is_scalar_array_elem(e)); + let elem_count = scalar_elems.clone().count() as u32; + write_map_len(writer, 1)?; + write_const_msgpack_str!(writer, "values")?; + write_array_len(writer, elem_count)?; + for elem in scalar_elems { + write_event_array_element(writer, elem)?; + } + } + AttributeValue::Bytes(_) | AttributeValue::KeyValue(_) => { + // Filtered upstream by `is_supported_event_attr`; reachable only on a bug. + debug_assert!(false, "unsupported event attribute variant reached writer"); + } + } + Ok(()) +} + +/// Returns `true` when `v` is a scalar that fits in a v0.4 `AttributeArrayValue` (no nesting). +fn is_scalar_array_elem(v: &AttributeValue) -> bool { + matches!( + v, + AttributeValue::String(_) + | AttributeValue::Bool(_) + | AttributeValue::Int(_) + | AttributeValue::Float(_) + ) +} + +/// Writes a v0.4 `AttributeArrayValue` (scalar). Same `{"type", "..._value"}` shape as +/// `write_event_attr_value`, minus the `Array` variant — v0.4 array elements are scalar only. +fn write_event_array_element( + writer: &mut W, + v: &AttributeValue, +) -> Result<(), ValueWriteError> { + match v { + AttributeValue::String(s) => { + write_map_len(writer, 2)?; + write_const_msgpack_str!(writer, "type")?; + write_u8(writer, 0)?; + write_const_msgpack_str!(writer, "string_value")?; + write_str(writer, s.borrow())?; + } + AttributeValue::Bool(b) => { + write_map_len(writer, 2)?; + write_const_msgpack_str!(writer, "type")?; + write_u8(writer, 1)?; + write_const_msgpack_str!(writer, "bool_value")?; + write_bool(writer, *b).map_err(ValueWriteError::InvalidDataWrite)?; + } + AttributeValue::Int(i) => { + write_map_len(writer, 2)?; + write_const_msgpack_str!(writer, "type")?; + write_u8(writer, 2)?; + write_const_msgpack_str!(writer, "int_value")?; + write_sint(writer, *i)?; + } + AttributeValue::Float(f) => { + write_map_len(writer, 2)?; + write_const_msgpack_str!(writer, "type")?; + write_u8(writer, 3)?; + write_const_msgpack_str!(writer, "double_value")?; + write_f64(writer, *f)?; + } + _ => { + // Filtered upstream by `is_scalar_array_elem`; reachable only on a bug. + debug_assert!(false, "non-scalar array element reached writer"); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + //! Unit tests for the v1::Span → v0.4 downgrade encoder. Each test encodes a small + //! `TracerPayload` via [`super::super::to_vec_from_v1`] and decodes the bytes with + //! `rmpv` to assert on the resulting v0.4 shape — this implicitly checks that the output + //! is also valid msgpack consumable by any standard v0.4 decoder (test-agent, agent, etc.). + use crate::span::v1::{ + AttributeValue, AttributeValueBytes, SpanBytes, SpanEventBytes, SpanKind, SpanLinkBytes, + TraceChunkBytes, TracerPayloadBytes, + }; + use crate::span::vec_map::VecMap; + use libdd_tinybytes::{Bytes, BytesString}; + use rmpv::Value; + use thin_vec::ThinVec; + + fn bs(s: &str) -> BytesString { + BytesString::from_slice(s.as_bytes()).expect("test string must fit in BytesString") + } + + /// Encodes `payload` and decodes back into `rmpv::Value`. The top level of v0.4 is an + /// array of traces; this helper returns it as a `Vec` so tests can index in. + fn encode_and_decode(payload: &TracerPayloadBytes) -> Vec { + let bytes = super::super::to_vec_from_v1(payload); + let value = rmpv::decode::read_value(&mut &bytes[..]).expect("decode failed"); + match value { + Value::Array(traces) => traces, + other => panic!("expected top-level array, got {other:?}"), + } + } + + /// Looks up `key` in a msgpack `Value::Map`. Returns `None` when absent so callers can + /// distinguish "field missing" from "field empty". + fn map_get<'a>(map: &'a Value, key: &str) -> Option<&'a Value> { + let entries = match map { + Value::Map(m) => m, + other => panic!("expected map, got {other:?}"), + }; + entries + .iter() + .find(|(k, _)| k.as_str() == Some(key)) + .map(|(_, v)| v) + } + + /// Convenience: build a minimal single-chunk single-span payload with the v0.4-equivalent + /// of the canonical "svc/op/res" example. Tests override fields as needed. + fn minimal_payload(trace_id: [u8; 16], span: SpanBytes) -> TracerPayloadBytes { + TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id, + spans: vec![span], + ..Default::default() + }], + ..Default::default() + } + } + + fn minimal_span() -> SpanBytes { + SpanBytes { + service: bs("svc"), + name: bs("op"), + resource: bs("res"), + span_id: 1, + start: 1_000, + duration: 500, + ..Default::default() + } + } + + #[test] + fn basic_span_writes_required_v04_fields() { + let payload = minimal_payload([0u8; 16], minimal_span()); + let traces = encode_and_decode(&payload); + + assert_eq!(traces.len(), 1); + let trace = traces[0].as_array().expect("trace must be array"); + assert_eq!(trace.len(), 1); + let span = &trace[0]; + + assert_eq!(map_get(span, "service").unwrap().as_str(), Some("svc")); + assert_eq!(map_get(span, "name").unwrap().as_str(), Some("op")); + assert_eq!(map_get(span, "resource").unwrap().as_str(), Some("res")); + assert_eq!(map_get(span, "span_id").unwrap().as_u64(), Some(1)); + assert_eq!(map_get(span, "trace_id").unwrap().as_u64(), Some(0)); + assert_eq!(map_get(span, "start").unwrap().as_i64(), Some(1_000)); + assert_eq!(map_get(span, "duration").unwrap().as_i64(), Some(500)); + // Optional fields must be absent when their underlying value is zero/empty. + assert!(map_get(span, "parent_id").is_none()); + assert!(map_get(span, "error").is_none()); + assert!(map_get(span, "type").is_none()); + assert!(map_get(span, "meta").is_none()); + assert!(map_get(span, "metrics").is_none()); + assert!(map_get(span, "meta_struct").is_none()); + } + + #[test] + fn promoted_fields_are_copied_into_meta() { + let span = SpanBytes { + env: bs("prod"), + version: bs("1.2.3"), + component: bs("http"), + span_kind: SpanKind::Server, + ..minimal_span() + }; + let payload = minimal_payload([0u8; 16], span); + let traces = encode_and_decode(&payload); + let meta = map_get(&traces[0][0], "meta").expect("meta must be present"); + + assert_eq!(map_get(meta, "env").unwrap().as_str(), Some("prod")); + assert_eq!(map_get(meta, "version").unwrap().as_str(), Some("1.2.3")); + assert_eq!(map_get(meta, "component").unwrap().as_str(), Some("http")); + assert_eq!(map_get(meta, "span.kind").unwrap().as_str(), Some("server")); + } + + #[test] + fn attribute_sharing_a_promoted_key_name_is_dropped_in_favor_of_the_dedicated_field() { + let mut attrs: VecMap = VecMap::new(); + attrs.insert(bs("env"), AttributeValue::String(bs("staging"))); + attrs.insert(bs("http.method"), AttributeValue::String(bs("GET"))); + let span = SpanBytes { + env: bs("prod"), + attributes: attrs, + ..minimal_span() + }; + let payload = minimal_payload([0u8; 16], span); + let traces = encode_and_decode(&payload); + let meta = map_get(&traces[0][0], "meta").expect("meta present"); + + // The dedicated `span.env` field wins; the colliding attribute is dropped rather than + // producing a duplicate `"env"` key on the wire. + assert_eq!(map_get(meta, "env").unwrap().as_str(), Some("prod")); + assert_eq!(map_get(meta, "http.method").unwrap().as_str(), Some("GET")); + } + + #[test] + fn span_kind_internal_is_not_emitted() { + // Internal is the default and is implied by the absence of `meta["span.kind"]`. + let payload = minimal_payload([0u8; 16], minimal_span()); + let traces = encode_and_decode(&payload); + // meta is None overall since no other field forces it. + assert!(map_get(&traces[0][0], "meta").is_none()); + } + + #[test] + fn trace_id_128_bit_splits_into_low_field_and_high_meta() { + // trace_id = 0x_DEADBEEF__CAFEBABE_DEADBEEF__CAFEBABE (high | low) + let mut tid = [0u8; 16]; + tid[..8].copy_from_slice(&0xDEAD_BEEF_CAFE_BABE_u64.to_be_bytes()); + tid[8..].copy_from_slice(&0x0123_4567_89AB_CDEF_u64.to_be_bytes()); + let payload = minimal_payload(tid, minimal_span()); + let traces = encode_and_decode(&payload); + let span = &traces[0][0]; + + assert_eq!( + map_get(span, "trace_id").unwrap().as_u64(), + Some(0x0123_4567_89AB_CDEF) + ); + let meta = map_get(span, "meta").expect("meta must be present (carries _dd.p.tid)"); + assert_eq!( + map_get(meta, "_dd.p.tid").unwrap().as_str(), + Some("deadbeefcafebabe"), + "high 64 bits must be encoded as lower-case hex without the 0x prefix" + ); + } + + #[test] + fn trace_id_high_zero_omits_dd_p_tid() { + // When the upper 64 bits are zero, `_dd.p.tid` must be absent so v0.4 consumers don't + // see a redundant `"0x0"` entry. + let mut tid = [0u8; 16]; + tid[8..].copy_from_slice(&42u64.to_be_bytes()); + let payload = minimal_payload(tid, minimal_span()); + let traces = encode_and_decode(&payload); + assert!(map_get(&traces[0][0], "meta").is_none()); + } + + #[test] + fn error_true_emits_one_false_omits_field() { + let payload_err = minimal_payload( + [0u8; 16], + SpanBytes { + error: true, + ..minimal_span() + }, + ); + let traces_err = encode_and_decode(&payload_err); + assert_eq!( + map_get(&traces_err[0][0], "error").unwrap().as_i64(), + Some(1) + ); + + let payload_ok = minimal_payload([0u8; 16], minimal_span()); + let traces_ok = encode_and_decode(&payload_ok); + assert!(map_get(&traces_ok[0][0], "error").is_none()); + } + + #[test] + fn string_attribute_is_routed_to_meta() { + let mut attrs: VecMap = VecMap::new(); + attrs.insert(bs("http.method"), AttributeValue::String(bs("GET"))); + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + attributes: attrs, + ..minimal_span() + }, + ); + let traces = encode_and_decode(&payload); + let meta = map_get(&traces[0][0], "meta").expect("meta present"); + assert_eq!(map_get(meta, "http.method").unwrap().as_str(), Some("GET")); + } + + #[test] + fn bool_attribute_is_stringified_in_meta() { + let mut attrs: VecMap = VecMap::new(); + attrs.insert(bs("retry"), AttributeValue::Bool(true)); + attrs.insert(bs("cached"), AttributeValue::Bool(false)); + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + attributes: attrs, + ..minimal_span() + }, + ); + let traces = encode_and_decode(&payload); + let meta = map_get(&traces[0][0], "meta").expect("meta present"); + assert_eq!(map_get(meta, "retry").unwrap().as_str(), Some("true")); + assert_eq!(map_get(meta, "cached").unwrap().as_str(), Some("false")); + } + + #[test] + fn float_and_int_attributes_route_to_metrics_as_f64() { + let mut attrs: VecMap = VecMap::new(); + attrs.insert(bs("duration_ms"), AttributeValue::Float(12.5)); + attrs.insert(bs("status"), AttributeValue::Int(200)); + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + attributes: attrs, + ..minimal_span() + }, + ); + let traces = encode_and_decode(&payload); + let metrics = map_get(&traces[0][0], "metrics").expect("metrics present"); + assert_eq!( + map_get(metrics, "duration_ms").unwrap().as_f64(), + Some(12.5) + ); + // Int is cast to f64 in the v0.4 metrics map per the mapping table. + assert_eq!(map_get(metrics, "status").unwrap().as_f64(), Some(200.0)); + } + + #[test] + fn bytes_attribute_routes_to_meta_struct_as_msgpack_bin() { + let mut attrs: VecMap = VecMap::new(); + attrs.insert( + bs("blob"), + AttributeValue::Bytes(Bytes::copy_from_slice(b"\xde\xad\xbe\xef")), + ); + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + attributes: attrs, + ..minimal_span() + }, + ); + let traces = encode_and_decode(&payload); + let ms = map_get(&traces[0][0], "meta_struct").expect("meta_struct present"); + assert_eq!( + map_get(ms, "blob").and_then(|v| match v { + Value::Binary(b) => Some(b.as_slice()), + _ => None, + }), + Some(b"\xde\xad\xbe\xef".as_slice()) + ); + } + + #[test] + fn list_attribute_is_flattened_into_dotted_meta_and_metrics_keys() { + let mut attrs: VecMap = VecMap::new(); + attrs.insert( + bs("ids"), + AttributeValue::List(vec![ + AttributeValue::Int(1), + AttributeValue::Int(2), + AttributeValue::String(bs("three")), + ]), + ); + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + attributes: attrs, + ..minimal_span() + }, + ); + let traces = encode_and_decode(&payload); + let span = &traces[0][0]; + assert!(map_get(span, "meta_struct").is_none()); + + let metrics = map_get(span, "metrics").expect("metrics present"); + assert_eq!(map_get(metrics, "ids.0").unwrap().as_f64(), Some(1.0)); + assert_eq!(map_get(metrics, "ids.1").unwrap().as_f64(), Some(2.0)); + + let meta = map_get(span, "meta").expect("meta present"); + assert_eq!(map_get(meta, "ids.2").unwrap().as_str(), Some("three")); + } + + #[test] + fn keyvalue_attribute_is_flattened_into_dotted_meta_and_metrics_keys() { + let mut inner_kv: VecMap = VecMap::new(); + inner_kv.insert(bs("user_id"), AttributeValue::Int(42)); + inner_kv.insert(bs("name"), AttributeValue::String(bs("alice"))); + inner_kv.insert(bs("active"), AttributeValue::Bool(true)); + + let mut attrs: VecMap = VecMap::new(); + attrs.insert(bs("user"), AttributeValue::KeyValue(inner_kv)); + + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + attributes: attrs, + ..minimal_span() + }, + ); + let traces = encode_and_decode(&payload); + let span = &traces[0][0]; + assert!(map_get(span, "meta_struct").is_none()); + + let metrics = map_get(span, "metrics").expect("metrics present"); + assert_eq!( + map_get(metrics, "user.user_id").unwrap().as_f64(), + Some(42.0) + ); + + let meta = map_get(span, "meta").expect("meta present"); + assert_eq!(map_get(meta, "user.name").unwrap().as_str(), Some("alice")); + assert_eq!(map_get(meta, "user.active").unwrap().as_str(), Some("true")); + } + + #[test] + fn nested_keyvalue_and_list_recurse_into_dotted_keys() { + // Build: {"outer": KeyValue { "items": List [String "a", KeyValue {"k": Int 1}] }} + let mut nested_kv: VecMap = VecMap::new(); + nested_kv.insert(bs("k"), AttributeValue::Int(1)); + + let mut middle_kv: VecMap = VecMap::new(); + middle_kv.insert( + bs("items"), + AttributeValue::List(vec![ + AttributeValue::String(bs("a")), + AttributeValue::KeyValue(nested_kv), + ]), + ); + + let mut attrs: VecMap = VecMap::new(); + attrs.insert(bs("outer"), AttributeValue::KeyValue(middle_kv)); + + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + attributes: attrs, + ..minimal_span() + }, + ); + let traces = encode_and_decode(&payload); + let span = &traces[0][0]; + assert!(map_get(span, "meta_struct").is_none()); + + let meta = map_get(span, "meta").expect("meta present"); + assert_eq!(map_get(meta, "outer.items.0").unwrap().as_str(), Some("a")); + + let metrics = map_get(span, "metrics").expect("metrics present"); + assert_eq!( + map_get(metrics, "outer.items.1.k").unwrap().as_f64(), + Some(1.0) + ); + } + + #[test] + fn chunk_origin_priority_and_sampling_mechanism_propagate_to_span() { + let chunk_attrs: VecMap = VecMap::new(); + let payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + priority: Some(1), + origin: bs("synthetics"), + sampling_mechanism: Some(4), + attributes: chunk_attrs, + spans: vec![minimal_span()], + ..Default::default() + }], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + let span = &traces[0][0]; + + let meta = map_get(span, "meta").expect("meta carries origin + sampling_mechanism"); + assert_eq!( + map_get(meta, "_dd.origin").unwrap().as_str(), + Some("synthetics") + ); + assert_eq!( + map_get(meta, "_dd.p.dm").unwrap().as_str(), + Some("-4"), + "sampling_mechanism is encoded as `-{{n}}` per the agent's convention" + ); + + let metrics = map_get(span, "metrics").expect("metrics carries sampling_priority_v1"); + assert_eq!( + map_get(metrics, "_sampling_priority_v1").unwrap().as_f64(), + Some(1.0) + ); + } + + #[test] + fn chunk_attributes_are_propagated_to_every_span_in_chunk() { + let mut chunk_attrs: VecMap = VecMap::new(); + chunk_attrs.insert(bs("region"), AttributeValue::String(bs("us-east-1"))); + let payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + attributes: chunk_attrs, + spans: vec![ + minimal_span(), + SpanBytes { + span_id: 2, + ..minimal_span() + }, + ], + ..Default::default() + }], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + let trace = traces[0].as_array().expect("trace is array of spans"); + assert_eq!(trace.len(), 2); + + for span in trace { + let meta = map_get(span, "meta").expect("each span inherits chunk attrs"); + assert_eq!(map_get(meta, "region").unwrap().as_str(), Some("us-east-1")); + } + } + + #[test] + fn payload_env_and_app_version_are_used_when_span_leaves_them_unset() { + let payload = TracerPayloadBytes { + env: bs("prod"), + app_version: bs("2.0.0"), + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + spans: vec![minimal_span()], + ..Default::default() + }], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + let meta = map_get(&traces[0][0], "meta").expect("meta present"); + assert_eq!(map_get(meta, "env").unwrap().as_str(), Some("prod")); + assert_eq!(map_get(meta, "version").unwrap().as_str(), Some("2.0.0")); + } + + #[test] + fn span_env_takes_precedence_over_payload_env() { + let payload = TracerPayloadBytes { + env: bs("prod"), + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + spans: vec![SpanBytes { + env: bs("staging"), + ..minimal_span() + }], + ..Default::default() + }], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + let meta = map_get(&traces[0][0], "meta").expect("meta present"); + assert_eq!(map_get(meta, "env").unwrap().as_str(), Some("staging")); + } + + #[test] + fn payload_attributes_are_propagated_with_lowest_precedence() { + let mut payload_attrs: VecMap = VecMap::new(); + payload_attrs.insert(bs("region"), AttributeValue::String(bs("us-east-1"))); + payload_attrs.insert(bs("shared"), AttributeValue::String(bs("payload"))); + + let mut chunk_attrs: VecMap = VecMap::new(); + chunk_attrs.insert(bs("shared"), AttributeValue::String(bs("chunk"))); + + let payload = TracerPayloadBytes { + attributes: payload_attrs, + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + attributes: chunk_attrs, + spans: vec![minimal_span()], + ..Default::default() + }], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + let meta = map_get(&traces[0][0], "meta").expect("meta present"); + assert_eq!(map_get(meta, "region").unwrap().as_str(), Some("us-east-1")); + // Chunk value wins over the payload's same-named attribute. + assert_eq!(map_get(meta, "shared").unwrap().as_str(), Some("chunk")); + } + + #[test] + fn dropped_trace_forces_user_reject_priority() { + let payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + dropped_trace: true, + spans: vec![minimal_span()], + ..Default::default() + }], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + let metrics = map_get(&traces[0][0], "metrics").expect("metrics present"); + assert_eq!( + map_get(metrics, "_sampling_priority_v1").unwrap().as_f64(), + Some(-1.0) + ); + } + + #[test] + fn dropped_trace_keeps_existing_negative_priority() { + let payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: [0u8; 16], + dropped_trace: true, + priority: Some(-2), + spans: vec![minimal_span()], + ..Default::default() + }], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + let metrics = map_get(&traces[0][0], "metrics").expect("metrics present"); + assert_eq!( + map_get(metrics, "_sampling_priority_v1").unwrap().as_f64(), + Some(-2.0) + ); + } + + #[test] + fn empty_payload_encodes_as_empty_top_level_array() { + let payload = TracerPayloadBytes::default(); + let traces = encode_and_decode(&payload); + assert!(traces.is_empty()); + } + + #[test] + fn multiple_chunks_become_multiple_traces() { + let payload = TracerPayloadBytes { + chunks: vec![ + TraceChunkBytes { + trace_id: [0u8; 16], + spans: vec![minimal_span()], + ..Default::default() + }, + TraceChunkBytes { + trace_id: [0u8; 16], + spans: vec![SpanBytes { + span_id: 99, + ..minimal_span() + }], + ..Default::default() + }, + ], + ..Default::default() + }; + let traces = encode_and_decode(&payload); + assert_eq!(traces.len(), 2); + assert_eq!(map_get(&traces[0][0], "span_id").unwrap().as_u64(), Some(1)); + assert_eq!( + map_get(&traces[1][0], "span_id").unwrap().as_u64(), + Some(99) + ); + } + + #[test] + fn span_link_splits_trace_id_into_low_and_high_fields() { + let mut link_tid = [0u8; 16]; + link_tid[..8].copy_from_slice(&0xAAAA_BBBB_CCCC_DDDD_u64.to_be_bytes()); + link_tid[8..].copy_from_slice(&0x1111_2222_3333_4444_u64.to_be_bytes()); + + let mut link_attrs: VecMap = VecMap::new(); + link_attrs.insert(bs("link.name"), AttributeValue::String(bs("job-42"))); + link_attrs.insert(bs("link.retry"), AttributeValue::Bool(true)); + // Non-string/bool typed attrs must be dropped (v0.4 SpanLink is String→String only). + link_attrs.insert(bs("link.count"), AttributeValue::Int(5)); + + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + span_links: ThinVec::from_iter([SpanLinkBytes { + trace_id: link_tid, + span_id: 7, + attributes: link_attrs, + tracestate: bs("dd=t.dm:-1"), + flags: 3, + }]), + ..minimal_span() + }, + ); + + let traces = encode_and_decode(&payload); + let links = map_get(&traces[0][0], "span_links").expect("span_links present"); + let links_arr = links.as_array().expect("span_links is array"); + assert_eq!(links_arr.len(), 1); + let link = &links_arr[0]; + + assert_eq!( + map_get(link, "trace_id").unwrap().as_u64(), + Some(0x1111_2222_3333_4444) + ); + assert_eq!( + map_get(link, "trace_id_high").unwrap().as_u64(), + Some(0xAAAA_BBBB_CCCC_DDDD) + ); + assert_eq!(map_get(link, "span_id").unwrap().as_u64(), Some(7)); + assert_eq!( + map_get(link, "tracestate").unwrap().as_str(), + Some("dd=t.dm:-1") + ); + assert_eq!(map_get(link, "flags").unwrap().as_u64(), Some(3)); + + let attrs = map_get(link, "attributes").expect("string attrs preserved"); + assert_eq!( + map_get(attrs, "link.name").unwrap().as_str(), + Some("job-42") + ); + assert_eq!(map_get(attrs, "link.retry").unwrap().as_str(), Some("true")); + // Int attr was dropped — v0.4 SpanLink schema cannot carry it. + assert!(map_get(attrs, "link.count").is_none()); + } + + #[test] + fn span_event_attributes_are_downgraded_to_v04_anyvalue_shape() { + let mut event_attrs: VecMap = VecMap::new(); + event_attrs.insert(bs("kind"), AttributeValue::String(bs("exception"))); + event_attrs.insert(bs("escaped"), AttributeValue::Bool(true)); + event_attrs.insert(bs("count"), AttributeValue::Int(3)); + event_attrs.insert(bs("ratio"), AttributeValue::Float(0.75)); + + let payload = minimal_payload( + [0u8; 16], + SpanBytes { + span_events: ThinVec::from_iter([SpanEventBytes { + time_unix_nano: 1_700_000_000_000_000_000, + name: bs("oops"), + attributes: event_attrs, + }]), + ..minimal_span() + }, + ); + + let traces = encode_and_decode(&payload); + let events = map_get(&traces[0][0], "span_events").expect("span_events present"); + let events_arr = events.as_array().expect("span_events is array"); + assert_eq!(events_arr.len(), 1); + let event = &events_arr[0]; + + assert_eq!(map_get(event, "name").unwrap().as_str(), Some("oops")); + assert_eq!( + map_get(event, "time_unix_nano").unwrap().as_u64(), + Some(1_700_000_000_000_000_000) + ); + + // Each typed attribute decodes to a `{"type": , "_value": value}` map. + let attrs = map_get(event, "attributes").expect("event attributes present"); + let kind = map_get(attrs, "kind").unwrap(); + assert_eq!(map_get(kind, "type").unwrap().as_u64(), Some(0)); + assert_eq!( + map_get(kind, "string_value").unwrap().as_str(), + Some("exception") + ); + + let escaped = map_get(attrs, "escaped").unwrap(); + assert_eq!(map_get(escaped, "type").unwrap().as_u64(), Some(1)); + assert_eq!( + map_get(escaped, "bool_value").unwrap().as_bool(), + Some(true) + ); + + let count = map_get(attrs, "count").unwrap(); + assert_eq!(map_get(count, "type").unwrap().as_u64(), Some(2)); + assert_eq!(map_get(count, "int_value").unwrap().as_i64(), Some(3)); + + let ratio = map_get(attrs, "ratio").unwrap(); + assert_eq!(map_get(ratio, "type").unwrap().as_u64(), Some(3)); + assert_eq!(map_get(ratio, "double_value").unwrap().as_f64(), Some(0.75)); + } +} diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index 9ae268c588..d64f80e28d 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; /// Integer keys for the top-level V1 trace payload map. mod trace_key { + pub const CONTAINER_ID: u8 = 2; pub const LANGUAGE_NAME: u8 = 3; pub const LANGUAGE_VERSION: u8 = 4; pub const TRACER_VERSION: u8 = 5; @@ -109,7 +110,8 @@ pub(super) const FLAT_ATTR_STRIDE: u32 = 3; /// incrementing integer ID. On subsequent occurrences only the ID is emitted as a msgpack `uint`. /// ID 0 is reserved for the empty string (pre-inserted in the constructor). /// -/// The string table is scoped per payload: each `to_vec` / `write_to_slice` call starts with a +/// The string table is scoped per payload: each `to_vec_from_v04` / `to_vec_from_v1` (and +/// their `write_to_slice_from_v04` / `write_to_slice_from_v1` counterparts) call starts with a /// fresh table so deduplication is payload-local. pub(crate) struct StringTable { seen: HashMap, @@ -308,6 +310,7 @@ where /// Top-level format: /// ```text /// Map { +/// trace_key::CONTAINER_ID (2) → str|uint // optional, interned /// trace_key::ENV_REF (7) → str|uint // optional, interned /// trace_key::HOSTNAME_REF (8) → str|uint // optional, interned /// trace_key::APP_VERSION (9) → str|uint // optional, interned @@ -315,7 +318,7 @@ where /// trace_key::CHUNKS (11) → Array[Chunk, ...] /// } /// ``` -fn encode_payload]>>( +fn encode_payload_from_v04]>>( writer: &mut W, traces: &[S], metadata: &TracerMetadata, @@ -332,6 +335,7 @@ fn encode_payload]>>( + (!metadata.language_version.is_empty()) as u32 + (!metadata.tracer_version.is_empty()) as u32 + (!metadata.runtime_id.is_empty()) as u32 + + (!metadata.container_id.is_empty()) as u32 + payload_attrs.env.is_some() as u32 + payload_attrs.hostname.is_some() as u32 + payload_attrs.app_version.is_some() as u32 @@ -342,7 +346,7 @@ fn encode_payload]>>( write_uint8(writer, trace_key::CHUNKS)?; write_array_len(writer, traces.len() as u32)?; for trace in traces { - encode_chunk(writer, trace.as_ref(), &mut table)?; + encode_chunk_from_v04(writer, trace.as_ref(), &mut table)?; } if !metadata.language.is_empty() { @@ -365,6 +369,11 @@ fn encode_payload]>>( table.write_interned(writer, &metadata.runtime_id)?; } + if !metadata.container_id.is_empty() { + write_uint8(writer, trace_key::CONTAINER_ID)?; + table.write_interned(writer, &metadata.container_id)?; + } + if let Some(env) = payload_attrs.env { write_uint8(writer, trace_key::ENV_REF)?; table.write_interned(writer, env)?; @@ -411,7 +420,7 @@ fn encode_payload]>>( /// chunk_key::SPANS (4) → Array[Span, ...] /// } /// ``` -fn encode_chunk( +fn encode_chunk_from_v04( writer: &mut W, spans: &[Span], table: &mut StringTable, @@ -456,38 +465,38 @@ fn encode_chunk( /// /// # Errors /// Returns a `ValueWriteError` if the underlying writer fails. -pub fn write_to_slice]>>( +pub fn write_to_slice_from_v04]>>( // &mut &mut [u8] lets the caller see the slice shrink as bytes are written. slice: &mut &mut [u8], traces: &[S], metadata: &TracerMetadata, ) -> Result<(), ValueWriteError> { - encode_payload(slice, traces, metadata) + encode_payload_from_v04(slice, traces, metadata) } /// Serializes traces into a `Vec` using the V1 msgpack format. -pub fn to_vec]>>( +pub fn to_vec_from_v04]>>( traces: &[S], metadata: &TracerMetadata, ) -> Vec { - to_vec_with_capacity(traces, 0, metadata) + to_vec_with_capacity_from_v04(traces, 0, metadata) } /// Serializes traces into a `Vec` with a pre-allocated capacity. -pub fn to_vec_with_capacity]>>( +pub fn to_vec_with_capacity_from_v04]>>( traces: &[S], capacity: u32, metadata: &TracerMetadata, ) -> Vec { let mut buf = ByteBuf::with_capacity(capacity as usize); - encode_payload(&mut buf, traces, metadata) + encode_payload_from_v04(&mut buf, traces, metadata) .map_err(super::flatten_value_write_infallible) .unwrap_infallible(); buf.into_vec() } /// Returns the number of bytes the V1 payload for `traces` would occupy. -pub fn to_encoded_byte_len]>>( +pub fn to_encoded_byte_len_from_v04]>>( traces: &[S], metadata: &TracerMetadata, ) -> u32 { @@ -497,12 +506,12 @@ pub fn to_encoded_byte_len]>>( // the way we do for `ByteBuf`. In practice `CountLength::write*` only ever return // `Ok`, so the error path here is unreachable today; should `CountLength` ever grow // a fallible code path, fuzz tests on the msgpack encoded length would catch it. - let _ = encode_payload(&mut counter, traces, metadata); + let _ = encode_payload_from_v04(&mut counter, traces, metadata); counter.0 } /// Encodes a [`TracerPayload`] (V1 data model) as a V1 msgpack payload. -fn encode_payload_v1( +fn encode_payload_from_v1( writer: &mut W, payload: &TracerPayload, ) -> Result<(), ValueWriteError> { @@ -511,6 +520,7 @@ fn encode_payload_v1( let has_attributes = !payload.attributes.is_empty(); let map_len = 1u32 // chunks always present + + (!payload.container_id.borrow().is_empty()) as u32 + (!payload.language_name.borrow().is_empty()) as u32 + (!payload.language_version.borrow().is_empty()) as u32 + (!payload.tracer_version.borrow().is_empty()) as u32 @@ -525,7 +535,12 @@ fn encode_payload_v1( write_uint8(writer, trace_key::CHUNKS)?; write_array_len(writer, payload.chunks.len() as u32)?; for chunk in &payload.chunks { - encode_chunk_v1(writer, chunk, &mut table)?; + encode_chunk_from_v1(writer, chunk, &mut table)?; + } + + if !payload.container_id.borrow().is_empty() { + write_uint8(writer, trace_key::CONTAINER_ID)?; + table.write_interned(writer, payload.container_id.borrow())?; } if !payload.language_name.borrow().is_empty() { @@ -572,7 +587,7 @@ fn encode_payload_v1( } /// Encodes one V1 chunk (a group of spans sharing a trace ID). -fn encode_chunk_v1( +fn encode_chunk_from_v1( writer: &mut W, chunk: &crate::span::v1::TraceChunk, table: &mut StringTable, @@ -640,19 +655,19 @@ fn encode_chunk_v1( /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_payload_v1; +/// use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_v1; /// use libdd_trace_utils::span::v1::TracerPayloadSlice; /// /// let payload = TracerPayloadSlice { /// language_name: "rust".into(), /// ..Default::default() /// }; -/// let encoded = to_vec_from_payload_v1(&payload); +/// let encoded = to_vec_from_v1(&payload); /// /// assert!(!encoded.is_empty()); /// ``` -pub fn to_vec_from_payload_v1(payload: &TracerPayload) -> Vec { - to_vec_from_payload_with_capacity_v1(payload, 0) +pub fn to_vec_from_v1(payload: &TracerPayload) -> Vec { + to_vec_with_capacity_from_v1(payload, 0) } /// Serializes a `TracerPayload` into a vector of bytes with specified capacity. @@ -669,23 +684,23 @@ pub fn to_vec_from_payload_v1(payload: &TracerPayload) -> Vec= 1024); /// ``` -pub fn to_vec_from_payload_with_capacity_v1( +pub fn to_vec_with_capacity_from_v1( payload: &TracerPayload, capacity: u32, ) -> Vec { let mut buf = ByteBuf::with_capacity(capacity as usize); - encode_payload_v1(&mut buf, payload) + encode_payload_from_v1(&mut buf, payload) .map_err(super::flatten_value_write_infallible) .unwrap_infallible(); buf.into_vec() @@ -710,7 +725,7 @@ pub fn to_vec_from_payload_with_capacity_v1( /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v1::write_payload_to_slice_v1; +/// use libdd_trace_utils::msgpack_encoder::v1::write_to_slice_from_v1; /// use libdd_trace_utils::span::v1::TracerPayloadSlice; /// /// let mut buffer = vec![0u8; 1024]; @@ -719,13 +734,13 @@ pub fn to_vec_from_payload_with_capacity_v1( /// ..Default::default() /// }; /// -/// write_payload_to_slice_v1(&mut &mut buffer[..], &payload).expect("Encoding failed"); +/// write_to_slice_from_v1(&mut &mut buffer[..], &payload).expect("Encoding failed"); /// ``` -pub fn write_payload_to_slice_v1( +pub fn write_to_slice_from_v1( slice: &mut &mut [u8], payload: &TracerPayload, ) -> Result<(), ValueWriteError> { - encode_payload_v1(slice, payload) + encode_payload_from_v1(slice, payload) } /// Computes the number of bytes required to encode the given `TracerPayload`. @@ -744,20 +759,20 @@ pub fn write_payload_to_slice_v1( /// # Examples /// /// ``` -/// use libdd_trace_utils::msgpack_encoder::v1::to_encoded_byte_len_from_payload_v1; +/// use libdd_trace_utils::msgpack_encoder::v1::to_encoded_byte_len_from_v1; /// use libdd_trace_utils::span::v1::TracerPayloadSlice; /// /// let payload = TracerPayloadSlice { /// language_name: "rust".into(), /// ..Default::default() /// }; -/// let encoded_len = to_encoded_byte_len_from_payload_v1(&payload); +/// let encoded_len = to_encoded_byte_len_from_v1(&payload); /// /// assert!(encoded_len > 0); /// ``` -pub fn to_encoded_byte_len_from_payload_v1(payload: &TracerPayload) -> u32 { +pub fn to_encoded_byte_len_from_v1(payload: &TracerPayload) -> u32 { let mut counter = super::CountLength(0); - let _ = encode_payload_v1(&mut counter, payload); + let _ = encode_payload_from_v1(&mut counter, payload); counter.0 } @@ -791,14 +806,14 @@ mod tests { fn test_to_vec_non_empty() { let spans = vec![make_span("svc", "op", 42, 1, 0)]; let traces = vec![spans]; - let encoded = to_vec(&traces, &TracerMetadata::default()); + let encoded = to_vec_from_v04(&traces, &TracerMetadata::default()); assert!(!encoded.is_empty()); } #[test] fn test_to_vec_empty_traces() { let traces: Vec> = vec![]; - let encoded = to_vec(&traces, &TracerMetadata::default()); + let encoded = to_vec_from_v04(&traces, &TracerMetadata::default()); // Must still produce a valid msgpack map with an empty chunks array. assert!(!encoded.is_empty()); } @@ -814,8 +829,8 @@ mod tests { let s_single = make_span("my-service", "op1", 1, 1, 0); let traces_single = vec![vec![s_single]]; - let encoded_two = to_vec(&traces_two, &TracerMetadata::default()); - let encoded_single = to_vec(&traces_single, &TracerMetadata::default()); + let encoded_two = to_vec_from_v04(&traces_two, &TracerMetadata::default()); + let encoded_single = to_vec_from_v04(&traces_single, &TracerMetadata::default()); // The two-trace payload should be less than 2× the single-trace payload // if interning is working (the second "my-service" is encoded as an integer). @@ -850,7 +865,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![root]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![root]], &TracerMetadata::default()); assert!(!encoded.is_empty()); // The payload must contain "lambda" somewhere (the origin string). let lambda_bytes = b"lambda"; @@ -870,8 +885,8 @@ mod tests { ]; let traces = vec![spans]; let meta = TracerMetadata::default(); - let encoded = to_vec(&traces, &meta); - let len = to_encoded_byte_len(&traces, &meta); + let encoded = to_vec_from_v04(&traces, &meta); + let len = to_encoded_byte_len_from_v04(&traces, &meta); assert_eq!(encoded.len() as u32, len); } @@ -897,7 +912,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![root]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![root]], &TracerMetadata::default()); assert!(!encoded.is_empty()); } @@ -932,7 +947,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![span]], &TracerMetadata::default()); let prod_bytes = b"prod"; assert!( encoded.windows(prod_bytes.len()).any(|w| w == prod_bytes), @@ -972,7 +987,7 @@ mod tests { ..Default::default() }; - let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![span]], &TracerMetadata::default()); // Both attribute strings must appear in the payload bytes. let ssi_bytes = b"ssi"; @@ -1002,7 +1017,7 @@ mod tests { fn test_payload_attributes_absent_when_no_relevant_tags() { // A span with no _dd.apm_mode or _dd.git.commit.sha must not produce key 10. let span = make_span("svc", "op", 1, 1, 0); - let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![span]], &TracerMetadata::default()); let apm_key = b"_dd.apm_mode"; assert!( !encoded.windows(apm_key.len()).any(|w| w == apm_key), @@ -1020,7 +1035,7 @@ mod tests { runtime_id: "abc-123-uuid".to_string(), ..Default::default() }; - let encoded = to_vec(&[vec![span]], &metadata); + let encoded = to_vec_from_v04(&[vec![span]], &metadata); for s in &[b"python" as &[u8], b"3.11", b"2.0.0", b"abc-123-uuid"] { assert!( @@ -1034,14 +1049,14 @@ mod tests { #[test] fn test_payload_metadata_absent_when_empty() { let span = make_span("svc", "op", 1, 1, 0); - let encoded_with = to_vec( + let encoded_with = to_vec_from_v04( &[vec![span.clone()]], &TracerMetadata { language: "go".to_string(), ..Default::default() }, ); - let encoded_without = to_vec(&[vec![span]], &TracerMetadata::default()); + let encoded_without = to_vec_from_v04(&[vec![span]], &TracerMetadata::default()); // Payload with metadata must be larger (it carries extra fields). assert!(encoded_with.len() > encoded_without.len()); } @@ -1065,7 +1080,7 @@ mod tests { meta, ..Default::default() }; - let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![span]], &TracerMetadata::default()); // Expected 16-byte BE: high = 0x640cfd5400000000, low = 0x0123456789abcdef let expected = [ @@ -1088,7 +1103,7 @@ mod tests { fn test_128bit_trace_id_without_dd_p_tid() { // Absent _dd.p.tid → high 64 bits zero. let span = make_span("svc", "op", 0x0123456789abcdef, 1, 0); - let encoded = to_vec(&[vec![span]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![span]], &TracerMetadata::default()); let expected = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, @@ -1120,7 +1135,7 @@ mod tests { meta, ..Default::default() }; - let encoded = to_vec(&[vec![root]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![root]], &TracerMetadata::default()); // The chunk-level sampling_mechanism (key 7) must be encoded as uint 4. // The byte sequence is `chunk_key::SAMPLING_MECHANISM (0x07)` followed by the @@ -1184,7 +1199,7 @@ mod tests { meta: meta3, ..Default::default() }; - let encoded = to_vec(&[vec![s1, s2, s3]], &TracerMetadata::default()); + let encoded = to_vec_from_v04(&[vec![s1, s2, s3]], &TracerMetadata::default()); // Each attribute must be present at chunk level — collected from a different // non-root span. @@ -1210,7 +1225,7 @@ mod tests { #[cfg(test)] mod v1_payload_tests { - //! Unit tests for the v1::Span encoder (`encode_payload_v1`). + //! Unit tests for the v1::Span encoder (`encode_payload_from_v1`). //! //! Verifies the encoder produces a valid V1 payload from the canonical //! [`crate::span::v1::TracerPayload`] data model and that core invariants (interning, byte @@ -1251,7 +1266,7 @@ mod v1_payload_tests { #[test] fn empty_payload_is_valid_msgpack_map() { let payload = TracerPayloadBytes::default(); - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); // Map with a single entry (chunks), then an empty array. `0x81` = fixmap of length 1, // followed by chunk key (0x0b), then `0x90` (fixarray length 0). assert_eq!(encoded, vec![0x81, 0x0b, 0x90]); @@ -1264,8 +1279,8 @@ mod v1_payload_tests { chunks: vec![chunk], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); - let len = to_encoded_byte_len_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); + let len = to_encoded_byte_len_from_v1(&payload); assert_eq!(encoded.len() as u32, len); } @@ -1278,7 +1293,7 @@ mod v1_payload_tests { chunks: vec![chunk], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); let pat = [0x10u8, 0x01u8]; assert!( encoded.windows(2).any(|w| w == pat), @@ -1305,7 +1320,7 @@ mod v1_payload_tests { chunks: vec![chunk], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); // String attribute → type discriminant = 1 (`AnyValueKey::String`). assert!( encoded.windows(b"k_str".len()).any(|w| w == b"k_str"), @@ -1335,7 +1350,7 @@ mod v1_payload_tests { chunks: vec![make_chunk(vec![span], [0u8; 16])], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); // bin8 marker `0xc4` followed by length `0x02` and the bytes themselves. let want = [0xc4u8, 0x02, 0xde, 0xad]; assert!( @@ -1371,7 +1386,7 @@ mod v1_payload_tests { chunks: vec![make_chunk(vec![span], [0u8; 16])], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); // The keys and the nested key must all appear at least once. for s in &[b"list" as &[u8], b"kv", b"a", b"nk"] { assert!( @@ -1395,7 +1410,7 @@ mod v1_payload_tests { chunks: vec![make_chunk(vec![make_span("svc", "op", 1)], [0u8; 16])], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); for s in &[ b"python" as &[u8], b"3.11", @@ -1426,7 +1441,7 @@ mod v1_payload_tests { chunks: vec![chunk], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); assert!( encoded.windows(b"lambda".len()).any(|w| w == b"lambda"), "chunk origin should appear" @@ -1448,7 +1463,7 @@ mod v1_payload_tests { chunks: vec![chunk], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); // DROPPED_TRACE (0x05) + msgpack true marker (0xc3) let want = [chunk_key::DROPPED_TRACE, 0xc3]; assert!( @@ -1469,7 +1484,7 @@ mod v1_payload_tests { chunks: vec![chunk], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); assert!( !encoded.contains(&chunk_key::DROPPED_TRACE), "DROPPED_TRACE key should not be emitted when false" @@ -1490,7 +1505,7 @@ mod v1_payload_tests { chunks: vec![chunk], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); // ATTRIBUTES (0x03) + msgpack fixarray header for 3 elements (0x93) let want = [chunk_key::ATTRIBUTES, 0x93]; assert!( @@ -1528,7 +1543,7 @@ mod v1_payload_tests { chunks: vec![make_chunk(vec![span], [0u8; 16])], ..Default::default() }; - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); let want = [0x10u8, expected_byte]; assert!( encoded.windows(2).any(|w| w == want), @@ -1554,8 +1569,8 @@ mod v1_payload_tests { chunks: vec![make_chunk(vec![make_span("shared", "op1", 1)], [0u8; 16])], ..Default::default() }; - let two = to_vec_from_payload_v1(&chunk_with_two); - let one = to_vec_from_payload_v1(&single); + let two = to_vec_from_v1(&chunk_with_two); + let one = to_vec_from_v1(&single); let shared_occurrences = two .windows(b"shared".len()) .filter(|w| *w == b"shared") diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index c1755623f6..a1b580101c 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -1,6 +1,9 @@ // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +//! Upgrade encoder: `crate::span::v04::Span` → V1 msgpack wire. +//! (Convention documented in [`crate::msgpack_encoder`].) + use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; use crate::span::TraceData; use rmp::encode::{ @@ -26,10 +29,9 @@ fn span_kind_from_str(s: &str) -> u32 { } } -/// Encodes span links into the V1 format. -/// -/// Uses integer keys and string interning for string values. Each span link's -/// trace ID is encoded as a 16-byte big-endian binary. +/// Encodes [`v04::SpanLink`](crate::span::v04::SpanLink)s into the V1 msgpack wire format +/// (upgrade: v0.4 input → V1 output). Uses integer keys and string interning for string +/// values. Each span link's trace ID is encoded as a 16-byte big-endian binary. pub fn encode_span_links( writer: &mut W, span_links: &[SpanLink], @@ -80,9 +82,9 @@ pub fn encode_span_links( Ok(()) } -/// Encodes span events into the V1 format. -/// -/// Uses integer keys and string interning. Attribute values are type-tagged. +/// Encodes [`v04::SpanEvent`](crate::span::v04::SpanEvent)s into the V1 msgpack wire format +/// (upgrade: v0.4 input → V1 output). Uses integer keys and string interning. Attribute values +/// are type-tagged. pub fn encode_span_events( writer: &mut W, span_events: &[SpanEvent], @@ -171,7 +173,8 @@ fn encode_attribute_any_value( Ok(()) } -/// Encodes a v0.4 span into the V1 msgpack format. +/// Encodes a [`v04::Span`](crate::span::v04::Span) into the V1 msgpack wire format +/// (upgrade: v0.4 input → V1 output). /// /// Key differences from v0.4: /// - Uses integer keys for all fields. diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v1.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v1.rs index 0fcd9a98e4..e63a68b212 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v1.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v1.rs @@ -1,6 +1,9 @@ // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +//! Native V1 span encoder: `crate::span::v1::Span` → V1 msgpack wire. +//! (Convention documented in [`crate::msgpack_encoder`].) + use crate::span::v1::{AttributeValue, Span, SpanEvent, SpanLink}; use crate::span::vec_map::VecMap; use crate::span::TraceData; @@ -108,7 +111,8 @@ pub(super) fn encode_attributes_map( Ok(()) } -/// Encodes a `SpanLink` object into a slice of bytes. +/// Encodes a [`v1::SpanLink`](crate::span::v1::SpanLink) into the V1 msgpack wire format +/// (native encoding: V1 input → V1 output). /// /// # Arguments /// @@ -168,7 +172,8 @@ pub(super) fn encode_span_links( Ok(()) } -/// Encodes a `SpanEvent` object into a slice of bytes. +/// Encodes a [`v1::SpanEvent`](crate::span::v1::SpanEvent) into the V1 msgpack wire format +/// (native encoding: V1 input → V1 output). /// /// # Arguments /// @@ -213,7 +218,8 @@ pub(super) fn encode_span_events( Ok(()) } -/// Encodes a `Span` object into a slice of bytes. +/// Encodes a [`v1::Span`](crate::span::v1::Span) into the V1 msgpack wire format +/// (native encoding: V1 input → V1 output). /// /// # Arguments /// diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index 53d14d8ecd..5fce403865 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -384,7 +384,7 @@ impl SendData { headers.insert(DATADOG_TRACE_COUNT, chunks.into()); headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK); - let payload = msgpack_encoder::v04::to_vec(payload); + let payload = msgpack_encoder::v04::to_vec_from_v04(payload); futures.push(self.send_payload( capabilities, @@ -534,7 +534,7 @@ mod tests { total } TracerPayloadCollection::V04(payloads) => { - msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize + msgpack_encoder::v04::to_encoded_byte_len_from_v04(payloads) as usize } TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(), } diff --git a/libdd-trace-utils/src/span/v1/mod.rs b/libdd-trace-utils/src/span/v1/mod.rs index 459cbc5438..42e64f76bf 100644 --- a/libdd-trace-utils/src/span/v1/mod.rs +++ b/libdd-trace-utils/src/span/v1/mod.rs @@ -112,6 +112,7 @@ pub struct TraceChunk { /// A V1 tracer payload: tracer-level metadata and the trace chunks it carries. #[derive(Debug, Default)] pub struct TracerPayload { + pub container_id: T::Text, pub language_name: T::Text, pub language_version: T::Text, pub tracer_version: T::Text, diff --git a/libdd-trace-utils/src/tracer_metadata.rs b/libdd-trace-utils/src/tracer_metadata.rs index 7d67ee953d..5fe93098ab 100644 --- a/libdd-trace-utils/src/tracer_metadata.rs +++ b/libdd-trace-utils/src/tracer_metadata.rs @@ -16,6 +16,7 @@ pub struct TracerMetadata { pub language_version: String, pub language_interpreter: String, pub language_interpreter_vendor: String, + pub container_id: String, pub git_commit_sha: String, pub process_tags: String, pub client_computed_stats: bool, @@ -30,6 +31,7 @@ impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> { tracer_version: &tags.tracer_version, lang_interpreter: &tags.language_interpreter, lang_vendor: &tags.language_interpreter_vendor, + container_id: &tags.container_id, client_computed_stats: tags.client_computed_stats, client_computed_top_level: tags.client_computed_top_level, ..Default::default() diff --git a/libdd-trace-utils/tests/snapshots/compare_v04_native_and_v1_to_v04_encoders_snapshot_test.json b/libdd-trace-utils/tests/snapshots/compare_v04_native_and_v1_to_v04_encoders_snapshot_test.json new file mode 100644 index 0000000000..bcb9423a20 --- /dev/null +++ b/libdd-trace-utils/tests/snapshots/compare_v04_native_and_v1_to_v04_encoders_snapshot_test.json @@ -0,0 +1,38 @@ +[[ + { + "name": "op", + "service": "svc", + "resource": "res", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "meta": { + "env": "test-env", + "http.method": "GET", + "span.kind": "server" + }, + "metrics": { + "http.duration_ms": 12.5 + }, + "duration": 5000, + "start": 1000000 + }], +[ + { + "name": "op", + "service": "svc", + "resource": "res", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "meta": { + "env": "test-env", + "http.method": "GET", + "span.kind": "server" + }, + "metrics": { + "http.duration_ms": 12.5 + }, + "duration": 5000, + "start": 1000000 + }]] diff --git a/libdd-trace-utils/tests/test_send_data.rs b/libdd-trace-utils/tests/test_send_data.rs index 39a10b58ff..a3b6546426 100644 --- a/libdd-trace-utils/tests/test_send_data.rs +++ b/libdd-trace-utils/tests/test_send_data.rs @@ -396,7 +396,7 @@ mod tracing_integration_tests { // ───────────────────────── V1 integration tests ────────────────────────── // // These tests cover the v1::Span encoder end-to-end: the payload is built directly from the - // `TracerPayload` data model in Rust, encoded with `to_vec_from_payload_v1`, POSTed to the + // `TracerPayload` data model in Rust, encoded with `to_vec_from_v1`, POSTed to the // `dd-apm-test-agent`'s `/v1.0/traces`, and validated via snapshot. The test-agent is the V1 // decoder, so this exercises the full round-trip without us having to maintain one in this // crate. @@ -413,10 +413,11 @@ mod tracing_integration_tests { out } - /// POSTs a raw V1 msgpack payload to the test-agent's `/v1.0/traces` and asserts the agent - /// returns 2xx. Headers are the minimum the agent needs to attach the payload to a snapshot - /// session (`X-Datadog-Test-Session-Token` query param + `Datadog-Meta-Lang*` for routing). - async fn post_v1_payload(uri: hyper::Uri, body: Vec) { + /// POSTs a raw msgpack trace payload to the test-agent (the endpoint — `/v0.4/traces` or + /// `/v1.0/traces` — is baked into `uri` by the caller). Asserts the agent returns 2xx. + /// Headers are the minimum the agent needs to attach the payload to a snapshot session + /// (`X-Datadog-Test-Session-Token` query param + `Datadog-Meta-Lang*` for routing). + async fn post_msgpack_traces(uri: hyper::Uri, body: Vec) { use libdd_capabilities_impl::HttpClientCapability; let client = NativeCapabilities::new_client(); let req = http::Request::builder() @@ -520,15 +521,16 @@ mod tracing_integration_tests { app_version: bs_v1("1.2.3"), attributes: VecMap::new(), chunks: vec![chunk], + ..Default::default() } } /// End-to-end round-trip: builds a V1 payload directly from `TracerPayload`, encodes it - /// with `to_vec_from_payload_v1`, POSTs to the test-agent, and asserts the snapshot. + /// with `to_vec_from_v1`, POSTs to the test-agent, and asserts the snapshot. #[cfg_attr(miri, ignore)] #[tokio::test] async fn compare_v1_native_trace_snapshot_test() { - use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_payload_v1; + use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_v1; let relative_snapshot_path = "libdd-trace-utils/tests/snapshots/"; let snapshot_name = "compare_send_data_v1_native_trace_snapshot_test"; @@ -541,9 +543,9 @@ mod tracing_integration_tests { test_agent.start_session(snapshot_name, None).await; let payload = make_v1_payload("test_send_data_v1_native_snapshot"); - let encoded = to_vec_from_payload_v1(&payload); + let encoded = to_vec_from_v1(&payload); - post_v1_payload(uri, encoded).await; + post_msgpack_traces(uri, encoded).await; test_agent.assert_snapshot(snapshot_name).await; } @@ -555,7 +557,7 @@ mod tracing_integration_tests { #[cfg_attr(miri, ignore)] #[tokio::test] async fn compare_v04_and_v1_encoders_snapshot_test() { - use libdd_trace_utils::msgpack_encoder::v1::{to_vec, to_vec_from_payload_v1}; + use libdd_trace_utils::msgpack_encoder::v1::{to_vec_from_v04, to_vec_from_v1}; use libdd_trace_utils::span::v04::SpanBytes as V04SpanBytes; use libdd_trace_utils::span::v1::{ AttributeValue, SpanBytes as V1SpanBytes, SpanKind, TraceChunkBytes, TracerPayloadBytes, @@ -618,13 +620,97 @@ mod tracing_integration_tests { }; // ── POST both into the same session ──────────────────────────────────────── - let bytes_v04 = to_vec(&v04_traces, &metadata); - let bytes_v1 = to_vec_from_payload_v1(&v1_payload); - post_v1_payload(uri.clone(), bytes_v04).await; - post_v1_payload(uri, bytes_v1).await; + let bytes_v04 = to_vec_from_v04(&v04_traces, &metadata); + let bytes_v1 = to_vec_from_v1(&v1_payload); + post_msgpack_traces(uri.clone(), bytes_v04).await; + post_msgpack_traces(uri, bytes_v1).await; // Both POSTs share trace_id=1, so the test-agent merges them into a single trace of // 2 decoded spans. The checked-in snapshot is the canonical equivalent decoded form. test_agent.assert_snapshot(snapshot_name).await; } + + /// Round-trip: the v1::Span → v0.4 downgrade encoder + /// ([`libdd_trace_utils::msgpack_encoder::v04::to_vec_from_v1`]) must produce a v0.4 + /// payload that decodes to the same canonical form as the native v0.4 encoder + /// ([`libdd_trace_utils::msgpack_encoder::v04::to_vec`]) when fed equivalent input. + /// + /// Both encodings are POSTed to `/v0.4/traces` in the same session with distinct + /// `trace_id`s, and the checked-in snapshot records the two traces side by side. Any + /// drift in either encoder makes its trace diverge from the canonical decoded form. + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn compare_v04_native_and_v1_to_v04_encoders_snapshot_test() { + use libdd_trace_utils::msgpack_encoder::v04::to_vec_from_v04 as to_vec_v04_native; + use libdd_trace_utils::msgpack_encoder::v04::to_vec_from_v1; + use libdd_trace_utils::span::v04::SpanBytes as V04SpanBytes; + use libdd_trace_utils::span::v1::{ + AttributeValue, SpanBytes as V1SpanBytes, SpanKind, TraceChunkBytes, TracerPayloadBytes, + }; + + let relative_snapshot_path = "libdd-trace-utils/tests/snapshots/"; + let snapshot_name = "compare_v04_native_and_v1_to_v04_encoders_snapshot_test"; + let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await; + let uri = test_agent + .get_uri_for_endpoint("v0.4/traces", Some(snapshot_name)) + .await; + + test_agent.start_session(snapshot_name, None).await; + + // ── v0.4 native input — trace_id = 1 ─────────────────────────────────────── + // Note: v0.4's `meta["span.kind"]` is the downgrade target for the v1::SpanKind enum, + // so we set it explicitly here for parity with the v1::Span path below. + let mut meta_v04 = VecMap::new(); + meta_v04.insert(bs_v1("env"), bs_v1("test-env")); + meta_v04.insert(bs_v1("http.method"), bs_v1("GET")); + meta_v04.insert(bs_v1("span.kind"), bs_v1("server")); + let mut metrics_v04 = VecMap::new(); + metrics_v04.insert(bs_v1("http.duration_ms"), 12.5_f64); + let v04_traces: Vec> = vec![vec![V04SpanBytes { + service: bs_v1("svc"), + name: bs_v1("op"), + resource: bs_v1("res"), + trace_id: 1, + span_id: 1, + start: 1_000_000, + duration: 5_000, + meta: meta_v04, + metrics: metrics_v04, + ..Default::default() + }]]; + + // ── v1::Span input — trace_id = 2, semantically equivalent otherwise ─────── + // The v1 model carries `env` / `span_kind` as typed fields rather than meta strings; + // the downgrade encoder must place them back under `meta["env"]` / `meta["span.kind"]`. + let mut attrs_v1 = VecMap::new(); + attrs_v1.insert(bs_v1("http.method"), AttributeValue::String(bs_v1("GET"))); + attrs_v1.insert(bs_v1("http.duration_ms"), AttributeValue::Float(12.5)); + let v1_payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: tid_bytes(0, 2), + spans: vec![V1SpanBytes { + service: bs_v1("svc"), + name: bs_v1("op"), + resource: bs_v1("res"), + span_id: 1, + start: 1_000_000, + duration: 5_000, + span_kind: SpanKind::Server, + env: bs_v1("test-env"), + attributes: attrs_v1, + ..Default::default() + }], + ..Default::default() + }], + ..Default::default() + }; + + // ── Encode each via its respective encoder and POST both ─────────────────── + let bytes_v04 = to_vec_v04_native(&v04_traces); + let bytes_v1_to_v04 = to_vec_from_v1(&v1_payload); + post_msgpack_traces(uri.clone(), bytes_v04).await; + post_msgpack_traces(uri, bytes_v1_to_v04).await; + + test_agent.assert_snapshot(snapshot_name).await; + } }