Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,54 @@ pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_bytes(
MaybeError::None
}

/// Sends a V1-encoded trace to the sidecar via shared memory. The sidecar decodes the V1
/// `TracerPayload`, can inspect it, and re-encodes it as V1 msgpack on the way to the agent's
/// `/v1.0/traces` endpoint.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_send_trace_v1_shm(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
shm_handle: Box<ShmHandle>,
len: usize,
tracer_header_tags: &TracerHeaderTags,
) -> MaybeError {
let tracer_header_tags = try_c!(tracer_header_tags.try_into());

try_c!(blocking::send_trace_v1_shm(
transport,
instance_id,
*shm_handle,
len,
tracer_header_tags,
));

MaybeError::None
}

/// Sends a V1-encoded trace as bytes to the sidecar. The sidecar decodes the V1 `TracerPayload`,
/// can inspect it, and re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces`
/// endpoint.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_send_trace_v1_bytes(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
data: ffi::CharSlice,
tracer_header_tags: &TracerHeaderTags,
) -> MaybeError {
let tracer_header_tags = try_c!(tracer_header_tags.try_into());

try_c!(blocking::send_trace_v1_bytes(
transport,
instance_id,
data.as_bytes().to_vec(),
tracer_header_tags,
));

MaybeError::None
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
#[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals
Expand Down
25 changes: 25 additions & 0 deletions datadog-sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,31 @@ pub fn send_trace_v04_shm(
Ok(())
}

/// Sends a V1-encoded trace as bytes. The sidecar decodes the V1 payload, can inspect it, and
/// re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces` endpoint.
pub fn send_trace_v1_bytes(
transport: &mut SidecarTransport,
instance_id: &InstanceId,
data: Vec<u8>,
headers: SerializedTracerHeaderTags,
) -> io::Result<()> {
lock_sender(transport)?.send_trace_v1_bytes(instance_id.clone(), data, headers);
Ok(())
}

/// Sends a V1-encoded trace via shared memory. The sidecar decodes the V1 payload, can inspect
/// it, and re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces` endpoint.
pub fn send_trace_v1_shm(
transport: &mut SidecarTransport,
instance_id: &InstanceId,
handle: ShmHandle,
len: usize,
headers: SerializedTracerHeaderTags,
) -> io::Result<()> {
lock_sender(transport)?.send_trace_v1_shm(instance_id.clone(), handle, len, headers);
Ok(())
}

/// Sends raw data from shared memory to the debugger endpoint.
pub fn send_debugger_data_shm(
transport: &mut SidecarTransport,
Expand Down
27 changes: 27 additions & 0 deletions datadog-sidecar/src/service/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,33 @@ impl SidecarSender {
.try_send_send_trace_v04_bytes(instance_id, data, headers);
}

pub fn send_trace_v1_shm(
&mut self,
instance_id: InstanceId,
handle: ShmHandle,
len: usize,
headers: SerializedTracerHeaderTags,
) {
if !self.try_drain_outbox() {
return;
}
self.channel
.try_send_send_trace_v1_shm(instance_id, handle, len, headers);
}

pub fn send_trace_v1_bytes(
&mut self,
instance_id: InstanceId,
data: Vec<u8>,
headers: SerializedTracerHeaderTags,
) {
if !self.try_drain_outbox() {
return;
}
self.channel
.try_send_send_trace_v1_bytes(instance_id, data, headers);
}

pub fn send_debugger_data_shm(
&mut self,
instance_id: InstanceId,
Expand Down
32 changes: 32 additions & 0 deletions datadog-sidecar/src/service/sidecar_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,38 @@ pub trait SidecarInterface {
headers: SerializedTracerHeaderTags,
);

/// Sends a V1-encoded trace via shared memory. The sidecar decodes the V1 `TracerPayload`,
/// can inspect it, and re-encodes it as V1 msgpack on the way to the agent's
/// `/v1.0/traces` endpoint. Use this when the SDK speaks V1 natively.
///
/// # Arguments
///
/// * `instance_id` - The ID of the instance.
/// * `handle` - The handle to the shared memory.
/// * `len` - The size of the shared memory data.
/// * `headers` - The serialized headers from the tracer.
async fn send_trace_v1_shm(
instance_id: InstanceId,
#[SerializedHandle] handle: ShmHandle,
len: usize,
headers: SerializedTracerHeaderTags,
);

/// Sends a V1-encoded trace as bytes. The sidecar decodes the V1 `TracerPayload`, can
/// inspect it, and re-encodes it as V1 msgpack on the way to the agent's `/v1.0/traces`
/// endpoint. Use this when the SDK speaks V1 natively.
///
/// # Arguments
///
/// * `instance_id` - The ID of the instance.
/// * `data` - The V1 trace data serialized as bytes.
/// * `headers` - The serialized headers from the tracer.
async fn send_trace_v1_bytes(
instance_id: InstanceId,
data: Vec<u8>,
headers: SerializedTracerHeaderTags,
);

/// Transfers raw data to a live-debugger endpoint.
///
/// # Arguments
Expand Down
83 changes: 82 additions & 1 deletion datadog-sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,30 @@ impl SidecarServer {
data: tinybytes::Bytes,
target: &Endpoint,
retry_interval: u64,
) {
self.send_trace(headers, data, target, retry_interval, TraceEncoding::V04)
}

/// Entry point for the V1 trace path. Input bytes are a V1 msgpack `TracerPayload` from the
/// SDK; the [`TraceEncoding::V1`] tag drives [`decode_to_trace_chunks`] to the V1 decoder,
/// and [`SendData`] then re-encodes the same shape as V1 on the wire to the agent.
fn send_trace_v1(
&self,
headers: &SerializedTracerHeaderTags,
data: tinybytes::Bytes,
target: &Endpoint,
retry_interval: u64,
) {
self.send_trace(headers, data, target, retry_interval, TraceEncoding::V1)
}

fn send_trace(
&self,
headers: &SerializedTracerHeaderTags,
data: tinybytes::Bytes,
target: &Endpoint,
retry_interval: u64,
encoding: TraceEncoding,
) {
let headers: TracerHeaderTags = match headers.try_into() {
Ok(headers) => headers,
Expand All @@ -265,7 +289,7 @@ impl SidecarServer {
headers
);

match decode_to_trace_chunks(data, TraceEncoding::V04) {
match decode_to_trace_chunks(data, encoding) {
Ok((payload, size)) => {
trace!("Parsed the trace payload and enqueuing it for sending: {payload:?}");
let mut data = SendData::new(
Expand Down Expand Up @@ -898,6 +922,63 @@ impl SidecarInterface for ConnectionSidecarHandler {
}
}

async fn send_trace_v1_shm(
&self,
_peer: PeerCredentials,
instance_id: InstanceId,
handle: ShmHandle,
_len: usize,
headers: SerializedTracerHeaderTags,
) {
self.track_instance(&instance_id);
let session = self.server.get_session(&instance_id.session_id);
let trace_config = session.get_trace_config();
if let Some(endpoint) = trace_config.endpoint.clone() {
let server = self.server.clone();
let retry_interval = trace_config.retry_interval;
tokio::spawn(async move {
match handle.map() {
Ok(mapped) => {
let bytes = tinybytes::Bytes::from(mapped);
server.send_trace_v1(&headers, bytes, &endpoint, retry_interval);
}
Err(e) => error!("Failed mapping shared trace data memory: {}", e),
}
});
} else {
warn!(
"Received trace data ({handle:?}) for missing session {}",
instance_id.session_id
);
}
}

async fn send_trace_v1_bytes(
&self,
_peer: PeerCredentials,
instance_id: InstanceId,
data: Vec<u8>,
headers: SerializedTracerHeaderTags,
) {
self.track_instance(&instance_id);
let session = self.server.get_session(&instance_id.session_id);
let trace_config = session.get_trace_config();

if let Some(endpoint) = trace_config.endpoint.clone() {
let server = self.server.clone();
let retry_interval = trace_config.retry_interval;
tokio::spawn(async move {
let bytes = tinybytes::Bytes::from(data);
server.send_trace_v1(&headers, bytes, &endpoint, retry_interval);
});
} else {
warn!(
"Received trace data for missing session {}",
instance_id.session_id
);
}
}

async fn send_debugger_data_shm(
&self,
_peer: PeerCredentials,
Expand Down
60 changes: 47 additions & 13 deletions libdd-data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use libdd_trace_utils::msgpack_encoder;
use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::trace_utils::{self, TracerHeaderTags};
use libdd_trace_utils::tracer_metadata::TracerMetadata;
use libdd_trace_utils::tracer_payload::{self, TraceEncoding};
use libdd_trace_utils::tracer_payload::{self};

/// Minimal capacity of fresh buffers allocated to encode traces, in bytes.
const MIN_BUFFER_CAPACITY: usize = 1024;
Expand Down Expand Up @@ -59,7 +59,7 @@ impl TraceSerializer {
let chunks = payload.size();
let headers =
self.build_traces_headers(header_tags, chunks, agent_payload_response_version);
let mp_payload = self.serialize_payload(&payload, metadata)?;
let mp_payload = self.serialize_payload(&payload, metadata, output_format)?;

Ok(PreparedTracesPayload {
data: mp_payload,
Expand All @@ -78,12 +78,19 @@ impl TraceSerializer {
TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string()))
};
match output_format {
TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)),
TraceExporterOutputFormat::V04 => {
trace_utils::collect_trace_chunks(traces, TraceEncoding::V04).map_err(map_err)
// v0.4 input spans are kept as-is in `TraceChunks::V04`. Whether they go out as v0.4
// or are cross-encoded into V1 on the wire is decided in `serialize_payload`.
//
// APMSP-2812 - TODO: when the data-pipeline gains a V1-native input model (its own
// `v1::Span`-shaped builder), route `OutputFormat::V1` to
// `TraceChunks::V1(v1::TracerPayload)` instead and serialize via
// `to_vec_from_payload_v1`. A `StatSpan` impl on `v1::Span<T>` will also be needed
// if client-side stats are enabled on the V1-native path.
TraceExporterOutputFormat::V04 | TraceExporterOutputFormat::V1 => {
Ok(tracer_payload::TraceChunks::V04(traces))
}
TraceExporterOutputFormat::V05 => {
trace_utils::collect_trace_chunks(traces, TraceEncoding::V05).map_err(map_err)
trace_utils::convert_trace_chunks_v04_to_v05(traces).map_err(map_err)
}
}
}
Expand Down Expand Up @@ -113,23 +120,42 @@ impl TraceSerializer {
&self,
payload: &tracer_payload::TraceChunks<T>,
metadata: &TracerMetadata,
output_format: TraceExporterOutputFormat,
) -> Result<Vec<u8>, TraceExporterError> {
let capacity = self
.previous_serialised_len
.load(Ordering::Relaxed)
.max(MIN_BUFFER_CAPACITY);
let buff = match payload {
tracer_payload::TraceChunks::V04(p) => {
let buff = match (payload, output_format) {
(tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V04) => {
msgpack_encoder::v04::to_vec_with_capacity(p, capacity as u32)
}
tracer_payload::TraceChunks::V05(p) => {
// v0.4 spans cross-encoded as V1 on the wire — used when the agent advertises
// /v1.0/traces. Same in-memory shape as the v0.4 native path, different encoder.
(tracer_payload::TraceChunks::V04(p), TraceExporterOutputFormat::V1) => {
msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata)
}
(tracer_payload::TraceChunks::V05(p), TraceExporterOutputFormat::V05) => {
let mut buff = Vec::with_capacity(capacity);
rmp_serde::encode::write(&mut buff, p)
.map_err(TraceExporterError::Serialization)?;
buff
}
tracer_payload::TraceChunks::V1(p) => {
msgpack_encoder::v1::to_vec_with_capacity(p, capacity as u32, metadata)
// APMSP-2812 - TODO: native V1 input — call `msgpack_encoder::v1::to_vec_from_payload_v1`
// on the carried `v1::TracerPayload`. Not yet reachable: `collect_and_process_traces`
// never produces `TraceChunks::V1` in the current data-pipeline path.
(tracer_payload::TraceChunks::V1(_), TraceExporterOutputFormat::V1) => {
todo!("Native V1 input serialization not yet implemented (APMSP-2812)")
}
// `collect_and_process_traces` only produces (V04, V04|V1), (V05, V05),
// or (V1, V1) — any other combination here is a programming error.
_ => {
return Err(TraceExporterError::Deserialization(
DecodeError::InvalidFormat(
"Unsupported (TraceChunks, OutputFormat) combination for serialization"
.to_owned(),
),
));
}
};
self.previous_serialised_len
Expand Down Expand Up @@ -277,7 +303,11 @@ mod tests {
.collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V04)
.unwrap();

let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
let result = serializer.serialize_payload(
&payload,
&TracerMetadata::default(),
TraceExporterOutputFormat::V04,
);
assert!(result.is_ok());

let serialized = result.unwrap();
Expand Down Expand Up @@ -312,7 +342,11 @@ mod tests {
.collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V05)
.unwrap();

let result = serializer.serialize_payload(&payload, &TracerMetadata::default());
let result = serializer.serialize_payload(
&payload,
&TracerMetadata::default(),
TraceExporterOutputFormat::V05,
);
assert!(result.is_ok());

let serialized = result.unwrap();
Expand Down
1 change: 1 addition & 0 deletions libdd-trace-utils/src/msgpack_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
pub mod decode;
pub mod v04;
pub mod v05;
pub mod v1;
Loading
Loading