Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions libdd-data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use libdd_trace_utils::msgpack_encoder;
use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::trace_utils::{self, TracerHeaderTags};
use libdd_trace_utils::tracer_metadata::TracerMetadata;
use libdd_trace_utils::tracer_payload::{self, TraceEncoding};
use libdd_trace_utils::tracer_payload::{self};

/// Minimal capacity of fresh buffers allocated to encode traces, in bytes.
const MIN_BUFFER_CAPACITY: usize = 1024;
Expand Down Expand Up @@ -59,7 +59,7 @@ impl TraceSerializer {
let chunks = payload.size();
let headers =
self.build_traces_headers(header_tags, chunks, agent_payload_response_version);
let mp_payload = self.serialize_payload(&payload, metadata)?;
let mp_payload = self.serialize_payload(&payload, metadata, output_format)?;

Ok(PreparedTracesPayload {
data: mp_payload,
Expand All @@ -78,12 +78,19 @@ impl TraceSerializer {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
};
match output_format {
TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)),
TraceExporterOutputFormat::V04 => {
trace_utils::collect_trace_chunks(traces, TraceEncoding::V04).map_err(map_err)
// v0.4 input spans are kept as-is in `TraceChunks::V04`. Whether they go out as v0.4
// or are cross-encoded into V1 on the wire is decided in `serialize_payload`.
//
// APMSP-2812 - TODO: when the data-pipeline gains a V1-native input model (its own
// `v1::Span`-shaped builder), route `OutputFormat::V1` to
// `TraceChunks::V1(v1::TracerPayload)` instead and serialize via
// `to_vec_from_payload_v1`. A `StatSpan` impl on `v1::Span<T>` will also be needed
// if client-side stats are enabled on the V1-native path.
TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V1 => {
Ok(tracer_payload::TraceChunks::V04(traces))
}
TraceExporterOutputFormat::V05 => {
trace_utils::collect_trace_chunks(traces, TraceEncoding::V05).map_err(map_err)
trace_utils::convert_trace_chunks_v04_to_v05(traces).map_err(map_err)
}
}
}
Expand Down Expand Up @@ -113,23 +120,44 @@ impl TraceSerializer {
&self,
payload: &tracer_payload::TraceChunks<T>,
metadata: &TracerMetadata,
output_format: TraceExporterOutputFormat,
) -> Result<Vec<u8>, TraceExporterError> {
let capacity = self
.previous_serialised_len
.load(Ordering::Relaxed)
.max(MIN_BUFFER_CAPACITY);
let buff = match payload {
tracer_payload::TraceChunks::V04(p) => {
let buff = match (payload, output_format) {
(tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V04) => {
msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32)
}
tracer_payload::TraceChunks::V05(p) => {
// v0.4 spans cross-encoded as V1 on the wire (used when the agent advertises
// /v1.0/traces).
(tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V1) => {
msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata)
}
(tracer_payload::TraceChunks::V05(p), TraceExporterOutputFormat::V05) => {
let mut buff = Vec::with_capacity(capacity);
rmp_serde::encode::write(&mut buff, p)
.map_err(TraceExporterError::Serialization)?;
buff
}
tracer_payload::TraceChunks::V1(p) => {
msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata)
// APMSP-2812 - TODO: native V1 input — call
// `msgpack_encoder::v1::to_vec_from_payload_v1` on the carried
// `v1::TracerPayload`. Not yet reachable: `collect_and_process_traces`
// never produces `TraceChunks::V1` in the current data-pipeline path.
#[allow(clippy::unimplemented)]
(tracer_payload::TraceChunks::V1(_), TraceExporterOutputFormat::V1) => {
unimplemented!("Native V1 input serialization not yet implemented (APMSP-2812)")
}
// `collect_and_process_traces` only produces (V04, V04|V1), (V05, V05),
// or (V1, V1) — any other combination here is a programming error.
_ => {
return Err(TraceExporterError::Deserialization(
DecodeError::InvalidFormat(
"Unsupported (TraceChunks, OutputFormat) combination for serialization"
.to_owned(),
),
));
}
};
self.previous_serialised_len
Expand Down Expand Up @@ -277,7 +305,11 @@ mod tests {
.collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V04)
.unwrap();

let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
let result = serializer.serialize_payload(
&payload,
&TracerMetadata::default(),
TraceExporterOutputFormat::V04,
);
assert!(result.is_ok());

let serialized = result.unwrap();
Expand Down Expand Up @@ -312,7 +344,11 @@ mod tests {
.collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V05)
.unwrap();

let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
let result = serializer.serialize_payload(
&payload,
&TracerMetadata::default(),
TraceExporterOutputFormat::V05,
);
assert!(result.is_ok());

let serialized = result.unwrap();
Expand Down
1 change: 1 addition & 0 deletions libdd-trace-utils/src/msgpack_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
pub mod decode;
pub mod v04;
pub mod v05;
pub mod v1;
Loading
Loading