diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index b0f46caa97..8c2235092d 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -1,66 +1,95 @@ // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! Implementation of the publisher part of the [OTEL process -//! context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719) +//! Implementation of the publisher and same-process reader parts of the [OTEL process +//! context specification](https://github.com/open-telemetry/opentelemetry-specification/pull/4719). //! -//! # A note on race conditions +//! The update/read protocol is seqlock-style: the publisher marks the mapping as unavailable, +//! writes the payload metadata, publishes a non-zero version, and readers accept a copy only if +//! the version they observed before copying still matches afterward. The general algorithm and +//! the C++ memory-model constraints are described in Boehm's +//! [Can Seqlocks Get Along With Programming Language Memory Models?](https://web.archive.org/web/20211106170334/https://www.hpl.hp.com/techreports/2012/HPL-2012-68.pdf). +//! Linux has its own [seqlock/seqcount implementation](https://github.com/torvalds/linux/blob/master/include/linux/seqlock.h), +//! but its barriers are specified by the Linux kernel memory model, not by the C++/Rust models. //! -//! Process context sharing implies concurrently writing to a memory area that another process -//! might be actively reading. However, reading isn't done as direct memory accesses but goes -//! through the OS, so the Rust definition of race conditions doesn't really apply. We also use -//! atomics and fences, see MappingHeader's documentation. +//! This implementation differs from the usual odd/even counter form in two ways: `0` is the +//! in-progress sentinel, and each non-zero `monotonic_published_at_ns` value is the +//! reader-visible version. Updates force that timestamp to advance so readers can detect torn +//! reads even when the clock returns the same value twice. Concurrent writers are rejected, and +//! retry policy is left to the reader's caller. +//! +//! Process context sharing also crosses Rust's memory model boundary. In-process header fields +//! that can change during publication are atomics, while payload bytes are copied with +//! `process_vm_readv`; that syscall turns accesses to reclaimed payload memory that has been +//! unmapped into a syscall error or short read instead of a segfault, but its ordering relative to +//! the publisher has to be reasoned about at the OS/architecture level rather than only in Rust. #[cfg(target_os = "linux")] #[cfg(target_has_atomic = "64")] pub mod linux { use std::{ ffi::{c_void, CStr}, - mem::ManuallyDrop, + io, + mem::{size_of, ManuallyDrop}, os::fd::{AsRawFd, FromRawFd, OwnedFd}, - ptr::{self, addr_of_mut}, + ptr::{self, NonNull}, sync::{ - atomic::{fence, AtomicU64, Ordering}, + atomic::{fence, AtomicPtr, AtomicU32, AtomicU64, Ordering}, Mutex, MutexGuard, }, time::Duration, }; - use anyhow::Context; - - use libdd_trace_protobuf::opentelemetry::proto::common::v1::ProcessContext; + use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + any_value, AnyValue, KeyValue, ProcessContext, + }; use prost::Message; + mod self_reader; + pub use self_reader::ProcessContextSelfReader; + /// Current version of the process context format pub const PROCESS_CTX_VERSION: u32 = 2; /// Signature bytes for identifying process context mappings pub const SIGNATURE: &[u8; 8] = b"OTEL_CTX"; /// The discoverable name of the memory mapping. pub const MAPPING_NAME: &CStr = c"OTEL_CTX"; + /// Sentinel timestamp indicating that the context is unpublished or being updated. + const UNPUBLISHED_OR_UPDATING: u64 = 0; /// The header structure written at the start of the mapping. This must match the C /// layout of the specification. /// - /// # Atomic accesses - /// - /// The publishing protocol requires some form of synchronization. Using fences or any non-OS - /// based synchronization requires the use of atomics to have any effect (see [Mandatory - /// atomic](https://doc.rust-lang.org/std/sync/atomic/fn.fence.html#mandatory-atomic)) - /// - /// We use `monotonic_published_at_ns` for synchronization with the reader. Ideally, it should - /// be an `AtomicU64`, but this is incompatible with `#[repr(C, packed)]` by default, as it - /// could be misaligned. In our case, given the page size and the layout of `MappingHeader`, it - /// is actually 8-bytes aligned: we use [`AtomicU64::from_ptr`] to create an atomic view when - /// synchronization is needed. - #[repr(C, packed)] + /// The seqlock algorithm is inherently racy, so fields that can change during an + /// update must be atomic (even if accessed relaxed); otherwise we hit UB. `signature` + /// and `version` are intentionally plain fields; see the read-side synchronization comment in + /// [`ProcessContextSelfReader::read`] for why their accesses are race-free. + #[repr(C)] struct MappingHeader { signature: [u8; 8], version: u32, - payload_size: u32, - monotonic_published_at_ns: u64, - payload_ptr: *const u8, + payload_size: AtomicU32, + monotonic_published_at_ns: AtomicU64, + payload_ptr: AtomicPtr, } + // Compile-time verification that MappingHeader matches the field offsets and total size + // mandated by the OTel process context spec: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/oteps/profiles/4719-process-ctx.md + const _: () = { + use std::mem::{offset_of, size_of}; + assert!(offset_of!(MappingHeader, signature) == 0); + assert!(offset_of!(MappingHeader, version) == 8); + assert!(offset_of!(MappingHeader, payload_size) == 12); + assert!(offset_of!(MappingHeader, monotonic_published_at_ns) == 16); + assert!(offset_of!(MappingHeader, payload_ptr) == 24); + assert!(size_of::() == 32); + assert!(core::mem::align_of::() == 8); + assert!(core::mem::align_of::() == core::mem::align_of::()); + assert!(core::mem::align_of::>() == core::mem::align_of::<*mut u8>()); + assert!(size_of::<*mut u8>() == size_of::()); + }; + /// The shared memory mapped area to publish the context to. The memory region is owned by a /// [MemMapping] instance and is automatically unmapped upon drop. /// @@ -72,10 +101,10 @@ pub mod linux { /// - once `self` has been dropped, no memory access must be performed on the memory previously /// pointed to by `start`. struct MemMapping { - start_addr: *mut c_void, + start_addr: NonNull, } - // Safety: MemMapping represents ownership over the mapped region. It never leaks or + // SAFETY: MemMapping represents ownership over the mapped region. It never leaks or // share the internal pointer. It's also safe to drop (`munmap`) from a different thread. unsafe impl Send for MemMapping {} @@ -91,20 +120,27 @@ pub mod linux { /// /// `memfd` is the preferred method, but this function fallbacks to an anonymous mapping if /// `memfd` failed for any reason. - fn new() -> anyhow::Result { + /// + /// Both allocation paths produce zero-filled memory: `MAP_ANONYMOUS` mappings are + /// initialized to zero, and the memfd path maps a newly-created file extended by + /// `ftruncate()`, whose extended bytes read as `\0`. This matters because a memfd-backed + /// mapping is discoverable before `set_name()` runs, so early readers may race with header + /// initialization. They must observe [`UNPUBLISHED_OR_UPDATING`] (0) and stop until the + /// final timestamp store publishes the initialized header. + fn new() -> io::Result { let size = mapping_size(); try_memfd(MAPPING_NAME, libc::MFD_CLOEXEC | libc::MFD_NOEXEC_SEAL | libc::MFD_ALLOW_SEALING) .or_else(|_| try_memfd(MAPPING_NAME, libc::MFD_CLOEXEC | libc::MFD_ALLOW_SEALING)) .and_then(|fd| { - // Safety: fd is a valid open file descriptor. + // SAFETY: fd is a valid open file descriptor. check_syscall_retval( unsafe { libc::ftruncate(fd.as_raw_fd(), mapping_size() as libc::off_t) }, "ftruncate failed" )?; - // Safety: we pass a null pointer to mmap which is unconditionally ok + // SAFETY: we pass a null pointer to mmap which is unconditionally ok let start_addr = check_mapping_addr( unsafe { libc::mmap( @@ -124,7 +160,7 @@ pub mod linux { }) // If any previous step failed, we fallback to an anonymous mapping .or_else(|_| { - // Safety: we pass a null pointer to mmap, no precondition to uphold + // SAFETY: we pass a null pointer to mmap, no precondition to uphold let start_addr = check_mapping_addr( unsafe { libc::mmap( @@ -144,15 +180,15 @@ pub mod linux { } /// Makes this mapping discoverable by giving it a name. - fn set_name(&mut self) -> anyhow::Result<()> { - // Safety: self.start_addr is valid for mapping_size() bytes as per MemMapping + fn set_name(&mut self) -> io::Result<()> { + // SAFETY: self.start_addr is valid for mapping_size() bytes as per MemMapping // invariants. name is a valid NUL-terminated string that outlives the prctl call. check_syscall_retval( unsafe { libc::prctl( libc::PR_SET_VMA, libc::PR_SET_VMA_ANON_NAME as libc::c_ulong, - self.start_addr as libc::c_ulong, + self.start_addr.as_ptr() as libc::c_ulong, mapping_size() as libc::c_ulong, MAPPING_NAME.as_ptr() as libc::c_ulong, ) @@ -165,8 +201,8 @@ pub mod linux { /// Unmaps the underlying memory region. This has same effect as dropping `self`, but /// propagates potential errors. - fn free(mut self) -> anyhow::Result<()> { - // Safety: We put `self` in a `ManuallyDrop`, which prevents drop and future calls to + fn free(mut self) -> io::Result<()> { + // SAFETY: We put `self` in a `ManuallyDrop`, which prevents drop and future calls to // `free()`. unsafe { self.unmap()?; @@ -187,10 +223,10 @@ pub mod linux { /// /// Practically, `self` must be put in a `ManuallyDrop` wrapper and forgotten, or being in /// the process of being dropped. - unsafe fn unmap(&mut self) -> anyhow::Result<()> { + unsafe fn unmap(&mut self) -> io::Result<()> { check_syscall_retval( - // Safety: upheld by the caller. - unsafe { libc::munmap(self.start_addr, mapping_size()) }, + // SAFETY: upheld by the caller. + unsafe { libc::munmap(self.start_addr.as_ptr(), mapping_size()) }, "munmap failed when freeing the process context", )?; @@ -200,7 +236,7 @@ pub mod linux { impl Drop for MemMapping { fn drop(&mut self) { - // Safety: `self` is being dropped + // SAFETY: `self` is being dropped let _ = unsafe { self.unmap() }; } } @@ -209,7 +245,7 @@ pub mod linux { struct ProcessContextHandle { mapping: MemMapping, /// Once published, and until the next update is complete, the backing allocation of - /// `payload` might be read by external processes and thus most not move (e.g. by resizing + /// `payload` might be read by external processes and thus must not move (e.g. by resizing /// or drop). #[allow(unused)] payload: Vec, @@ -220,50 +256,42 @@ pub mod linux { impl ProcessContextHandle { /// Initial publication of the process context. Creates an appropriate memory mapping. - fn publish(payload: Vec) -> anyhow::Result { + fn publish(payload: Vec) -> io::Result { + let payload_size: u32 = payload + .len() + .try_into() + .map_err(|_| io::Error::other("payload size overflowed"))?; + let mut mapping = MemMapping::new()?; let size = mapping_size(); - check_syscall_retval( - // Safety: the invariants of MemMapping ensures `start_addr` is not null and comes + // SAFETY: the invariants of MemMapping ensures `start_addr` is not null and comes // from a previous call to `mmap` - unsafe { libc::madvise(mapping.start_addr, size, libc::MADV_DONTFORK) }, + unsafe { libc::madvise(mapping.start_addr.as_ptr(), size, libc::MADV_DONTFORK) }, "madvise MADVISE_DONTFORK failed", )?; let published_at_ns = since_boottime_ns().ok_or_else(|| { - anyhow::anyhow!("failed to get current time for process context publication") + io::Error::other("failed to get current time for process context publication") })?; - let header = mapping.start_addr as *mut MappingHeader; + let header = mapping.start_addr.as_ptr() as *mut MappingHeader; + // SAFETY: header points to a zero-filled, page-aligned mapping of at least + // mapping_size() bytes; field projections are in-bounds and aligned. unsafe { - // Safety: MappingHeader is packed, thus have no alignment requirement. It points - // to a freshly mmaped region which is valid for writing at least `mapping_size()`, - // which we make sure is greater than the size of MappingHeader. - ptr::write( - header, - MappingHeader { - signature: *SIGNATURE, - version: PROCESS_CTX_VERSION, - payload_size: payload - .len() - .try_into() - .context("payload size overflowed")?, - // will be set atomically at last - monotonic_published_at_ns: 0, - payload_ptr: payload.as_ptr(), - }, - ); - // We typically want to avoid the compiler and the hardware to re-order the write - // to the `monotonic_published_at_ns` (which should be last according to the - // specification) with the writes to other fields of the header. - // - // To do so, we implement synchronization during publication _as if the reader were - // another thread of this program_, using atomics and fences. - fence(Ordering::SeqCst); - AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) - .store(published_at_ns, Ordering::Relaxed); + ptr::addr_of_mut!((*header).signature).write(*SIGNATURE); + ptr::addr_of_mut!((*header).version).write(PROCESS_CTX_VERSION); + (*header) + .payload_size + .store(payload_size, Ordering::Relaxed); + (*header) + .payload_ptr + .store(payload.as_ptr().cast_mut(), Ordering::Relaxed); + + (*header) + .monotonic_published_at_ns + .store(published_at_ns, Ordering::Release); } // Note that naming must be unconditionally attempted, even on kernels where we might @@ -275,54 +303,63 @@ pub mod linux { Ok(ProcessContextHandle { mapping, payload, - // Safety: getpid() is always safe to call. + // SAFETY: getpid() is always safe to call. pid: unsafe { libc::getpid() }, }) } /// Updates the context after initial publication. - fn update(&mut self, payload: Vec) -> anyhow::Result<()> { - let header = self.mapping.start_addr as *mut MappingHeader; + fn update(&mut self, payload: Vec) -> io::Result<()> { + let header = self.mapping.start_addr.as_ptr() as *mut MappingHeader; let monotonic_published_at_ns = since_boottime_ns() - .ok_or_else(|| anyhow::anyhow!("could not get the current timestamp"))?; - let payload_size = payload.len().try_into().map_err(|_| { - anyhow::anyhow!("couldn't update process context: new payload too large") + .ok_or_else(|| io::Error::other("could not get the current timestamp"))?; + let payload_size: u32 = payload.len().try_into().map_err(|_| { + io::Error::other("couldn't update process context: new payload too large") })?; - - // Safety: + // A process shouldn't try to concurrently update its own context. // - // [^atomic-u64-alignment]: Page size is at minimum 4KB and will be always 8 bytes - // aligned even on exotic platforms. The offset `monotonic_published_at_ns` is 16 - // bytes, so it's 8-bytes aligned (`AtomicU64` has both a size and align of 8 bytes). - // - // The header memory is valid for both read and writes. - let published_at_atomic = - unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) }; - - // A process shouldn't try to concurrently update its own context + // `UNPUBLISHED_OR_UPDATING` is an out-of-band sentinel, not a value that + // `CLOCK_BOOTTIME` is expected to produce after publication. Published non-zero + // timestamp values must advance monotonically; the field may temporarily hold the + // sentinel while an update is in progress. // // Note: be careful of early return while `monotonic_published_at` is still zero, as // this would effectively "lock" any future publishing. Move throwing code above this // swap, or properly restore the previous value if the former can't be done. - if published_at_atomic.swap(0, Ordering::Relaxed) == 0 { - return Err(anyhow::anyhow!( - "concurrent update of the process context is not supported" + // SAFETY: the mapping is live and valid; the timestamp field is atomic and aligned. + let published_at_atomic = unsafe { &(*header).monotonic_published_at_ns }; + let previous_published_at_ns = + published_at_atomic.swap(UNPUBLISHED_OR_UPDATING, Ordering::Acquire); + if previous_published_at_ns == UNPUBLISHED_OR_UPDATING { + return Err(io::Error::other( + "concurrent update of the process context is not supported", )); } - fence(Ordering::SeqCst); + // The timestamp also acts as the seqlock version, so it must advance even if the + // clock source returns the same value for two rapid updates. + let monotonic_published_at_ns = + monotonic_published_at_ns.max(previous_published_at_ns.saturating_add(1)); + + // Pair this with the reader's acquire fence before its second timestamp load. If a + // reader starts from the previous non-zero timestamp but copies data after this update + // begins, it must not accept that copy as the previous version: its final timestamp + // check should see `UNPUBLISHED_OR_UPDATING` or the later published timestamp. + fence(Ordering::Release); self.payload = payload; - // Safety: we own the mapping, which is live and valid for writes. The header is packed - // and thus has no alignment constraints. + // SAFETY: the mapping is live and valid; the changeable fields are atomic and aligned. unsafe { - (*header).payload_ptr = self.payload.as_ptr(); - (*header).payload_size = payload_size; + (*header) + .payload_ptr + .store(self.payload.as_ptr().cast_mut(), Ordering::Relaxed); + (*header) + .payload_size + .store(payload_size, Ordering::Relaxed); } - fence(Ordering::SeqCst); - published_at_atomic.store(monotonic_published_at_ns, Ordering::Relaxed); + published_at_atomic.store(monotonic_published_at_ns, Ordering::Release); Ok(()) } @@ -330,37 +367,40 @@ pub mod linux { /// Returns `Err` wrapping the current `errno` with `msg` as context if `addr` equals /// `MAP_FAILED`, `Ok(addr)` otherwise. - fn check_mapping_addr(addr: *mut c_void, msg: &'static str) -> anyhow::Result<*mut c_void> { + fn check_mapping_addr(addr: *mut c_void, msg: &'static str) -> io::Result> { if addr == libc::MAP_FAILED { - Err(std::io::Error::last_os_error()).context(msg) + let e = io::Error::last_os_error(); + Err(io::Error::new(e.kind(), format!("{msg}: {e}"))) } else { - Ok(addr) + // SAFETY: mmap returns a non-null pointer on success. + Ok(unsafe { NonNull::new_unchecked(addr) }) } } /// Returns `Err` wrapping the current `errno` with `msg` as context if `ret` is negative, /// `Ok(ret)` otherwise. - fn check_syscall_retval(ret: libc::c_int, msg: &'static str) -> anyhow::Result { + fn check_syscall_retval(ret: libc::c_int, msg: &'static str) -> io::Result { if ret < 0 { - Err(std::io::Error::last_os_error()).context(msg) + let e = io::Error::last_os_error(); + Err(io::Error::new(e.kind(), format!("{msg}: {e}"))) } else { Ok(ret) } } /// Creates a `memfd` file descriptor with the given name and flags. - fn try_memfd(name: &CStr, flags: libc::c_uint) -> anyhow::Result { + fn try_memfd(name: &CStr, flags: libc::c_uint) -> io::Result { // We use the raw syscall rather than `libc::memfd_create` because the latter requires // glibc >= 2.27, while `syscall()` + `SYS_memfd_create` works with any glibc version. check_syscall_retval( - // Safety: name is a valid NUL-terminated string; flags are constant bit flags. + // SAFETY: name is a valid NUL-terminated string; flags are constant bit flags. unsafe { libc::syscall(libc::SYS_memfd_create, name.as_ptr(), flags as libc::c_long) as libc::c_int }, "memfd_create failed", ) - // Safety: fd is a valid file descriptor just returned by memfd_create. + // SAFETY: fd is a valid file descriptor just returned by memfd_create. .map(|fd| unsafe { OwnedFd::from_raw_fd(fd) }) } @@ -375,7 +415,7 @@ pub mod linux { tv_sec: 0, tv_nsec: 0, }; - // Safety: ts is a valid, writable timespec. + // SAFETY: ts is a valid, writable timespec. let ret = unsafe { libc::clock_gettime(libc::CLOCK_BOOTTIME, &mut ts) }; if ret != 0 { return None; @@ -387,12 +427,59 @@ pub mod linux { } /// Locks the context handle. Returns a uniform error if the lock has been poisoned. - fn lock_context_handle() -> anyhow::Result>> { + fn lock_context_handle() -> io::Result>> { PROCESS_CONTEXT_HANDLER.lock().map_err(|_| { - anyhow::anyhow!("a thread panicked while operating on the process context handler") + io::Error::other("a thread panicked while operating on the process context handler") }) } + /// Reads and decodes the current process's OTel process context. + /// To read multiple times, construct a new reader with [`ProcessContextSelfReader::new()`]. + pub fn read() -> io::Result { + ProcessContextSelfReader::new()?.read() + } + + fn string_array(value: &AnyValue) -> Option> { + let any_value::Value::ArrayValue(array) = value.value.as_ref()? else { + return None; + }; + + array + .values + .iter() + .map(|value| match value.value.as_ref()? { + any_value::Value::StringValue(value) => Some(value.clone()), + _ => None, + }) + .collect() + } + + // The process context only carries a small resource/extra attribute set, so a linear scan + // keeps this helper allocation-free and simpler than building a temporary index. + fn find_attr<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a AnyValue> { + attrs + .iter() + .find(|attr| attr.key == key) + .and_then(|attr| attr.value.as_ref()) + } + + /// Returns the thread-local attribute key map from a decoded process context. + pub fn threadlocal_attribute_key_map(context: &ProcessContext) -> Option> { + let key = "threadlocal.attribute_key_map"; + + context + .resource + .as_ref() + .and_then(|resource| find_attr(&resource.attributes, key)) + .or_else(|| find_attr(&context.extra_attributes, key)) + .and_then(string_array) + } + + /// Reads the current process context and returns its thread-local attribute key map. + pub fn read_threadlocal_attribute_key_map() -> io::Result>> { + Ok(threadlocal_attribute_key_map(&read()?)) + } + /// Publishes or updates the process context for it to be visible by external readers. /// /// If any of the following condition holds: @@ -416,14 +503,14 @@ pub mod linux { /// Ruby) that doesn't follow with an immediate `exec` is already "taking that risk", so to /// speak (typically, if no thread is ever spawned before the fork, things are mostly fine). #[inline] - pub fn publish(context: &ProcessContext) -> anyhow::Result<()> { + pub fn publish(context: &ProcessContext) -> io::Result<()> { publish_raw_payload(context.encode_to_vec()) } - fn publish_raw_payload(payload: Vec) -> anyhow::Result<()> { + fn publish_raw_payload(payload: Vec) -> io::Result<()> { let mut guard = lock_context_handle()?; - // Safety: getpid() is always safe to call. + // SAFETY: getpid() is always safe to call. match &mut *guard { Some(handler) if handler.pid == unsafe { libc::getpid() } => handler.update(payload), Some(handler) => { @@ -451,11 +538,45 @@ pub mod linux { /// this is no-op. /// /// A call to [publish] following an [unpublish] will create a new mapping. - pub fn unpublish() -> anyhow::Result<()> { + /// + /// # Safety + /// + /// This function may only be called if it can be guaranteed that there are no in-process + /// readers, or at least that they will not be used after the call. + pub unsafe fn unpublish() -> io::Result<()> { let mut guard = lock_context_handle()?; - if let Some(ProcessContextHandle { mapping, .. }) = guard.take() { - mapping.free()?; + #[cfg(debug_assertions)] + debug_assert_eq!( + self_reader::live_reader_count(), + 0, + "unpublish called while ProcessContextSelfReader instances are live" + ); + + if let Some(ProcessContextHandle { + mapping, payload, .. + }) = guard.take() + { + // Mark the context as unavailable before freeing the mapping/payload. The fence forces + // the writing CPU not to reorder the unavailable timestamp store and the deallocation + // stores. Formal correctness, which this does not aim for, only applies to in-process + // readers, and those are forbidden to read after `unpublish()` because they could also + // hit an unmapped header, which is why this function is unsafe. The ordering should + // help out-of-process readers. + // + // SAFETY: the mapping is still live and valid; the timestamp field is atomic + // and aligned. + let header = mapping.start_addr.as_ptr() as *mut MappingHeader; + unsafe { + (*header) + .monotonic_published_at_ns + .store(UNPUBLISHED_OR_UPDATING, Ordering::Relaxed); + } + fence(Ordering::Release); + + mapping.free()?; // payload will still drop if it fails + // but we'll be stuck with a zero timestamp + drop(payload); } Ok(()) @@ -464,81 +585,17 @@ pub mod linux { #[cfg(test)] #[serial_test::serial] mod tests { - use super::MappingHeader; - use anyhow::ensure; - use std::{ - fs::File, - io::{BufRead, BufReader}, - ptr::{self, addr_of_mut}, - sync::atomic::{fence, AtomicU64, Ordering}, - }; - - /// Parses the start address from a /proc/self/maps line - fn parse_mapping_start(line: &str) -> Option { - usize::from_str_radix(line.split('-').next()?, 16).ok() - } - - /// Checks if a mapping line refers to the OTEL_CTX mapping. - fn is_named_otel_mapping(line: &str) -> bool { - let trimmed = line.trim_end(); - - // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') - // but `split_whitespace()` takes care of that. - let Some(name) = trimmed.split_whitespace().nth(5) else { - return false; - }; - - name.starts_with("/memfd:OTEL_CTX") - || name.starts_with("[anon_shmem:OTEL_CTX]") - || name.starts_with("[anon:OTEL_CTX]") - } - - /// Establishes proper synchronization/memory ordering with the writer, checking that - /// `monotonic_published_at` is not zero and that the signature is correct. Returns a - /// pointer to the initialized header in case of success. - fn verify_mapping_at(addr: usize) -> anyhow::Result<*const MappingHeader> { - let header: *mut MappingHeader = ptr::with_exposed_provenance_mut(addr); - // Safety: we're reading from our own process memory at an address we found in - // /proc/self/maps. This should be safe as long as the mapping exists and has read - // permissions. - // - // For the alignment constraint of `AtomicU64`, see [^atomic-u64-alignment]. - let published_at = unsafe { - AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) - .load(Ordering::Relaxed) - }; - ensure!(published_at != 0, "monotonic_published_at_ns is zero: couldn't read an initialized header in the candidate mapping"); - fence(Ordering::SeqCst); - - // Safety: if `monotonic_published_at_ns` is non-zero, the header is properly - // initialized and thus readable. - let signature = unsafe { &header.as_ref().unwrap().signature }; - ensure!( - signature == super::SIGNATURE, - "invalid signature in the candidate mapping" - ); - - Ok(header) - } + use std::io; + use std::sync::atomic::Ordering; - /// Find the OTEL_CTX mapping in /proc/self/maps - fn find_otel_mapping() -> anyhow::Result { - let file = File::open("/proc/self/maps")?; - let reader = BufReader::new(file); + use super::{any_value, AnyValue, KeyValue, MappingHeader, ProcessContext}; - for line in reader.lines() { - let line = line?; - - if is_named_otel_mapping(&line) { - if let Some(addr) = parse_mapping_start(&line) { - return Ok(addr); - } - } - } - - Err(anyhow::anyhow!( - "couldn't find the mapping of the process context" - )) + struct MappingHeaderSnapshot { + signature: [u8; 8], + version: u32, + payload_size: u32, + monotonic_published_at_ns: u64, + payload_ptr: *const u8, } /// Read the process context from the current process. @@ -549,11 +606,41 @@ pub mod linux { /// functions it relies on, are specialized for tests (for example, it doesn't check for /// concurrent writers after reading the header, because we know they can't be). Do not /// extract or use as it is as a generic Rust OTel process context reader. - fn read_process_context() -> anyhow::Result { - let mapping_addr = find_otel_mapping()?; - let header_ptr = verify_mapping_at(mapping_addr)?; - // Safety: the pointer returned by `verify_mapping_at` points to an initialized header - Ok(unsafe { std::ptr::read(header_ptr) }) + fn read_process_context() -> io::Result { + let mapping_addr = super::ProcessContextSelfReader::find_otel_mapping()?; + let header_ptr: *const MappingHeader = std::ptr::with_exposed_provenance(mapping_addr); + // SAFETY: the mapping was published by this test before being read. + let header = unsafe { &*header_ptr }; + Ok(MappingHeaderSnapshot { + signature: header.signature, + version: header.version, + payload_size: header.payload_size.load(Ordering::Relaxed), + monotonic_published_at_ns: header.monotonic_published_at_ns.load(Ordering::Relaxed), + payload_ptr: header.payload_ptr.load(Ordering::Relaxed).cast_const(), + }) + } + + #[test] + #[cfg_attr(miri, ignore)] + fn publish_then_read_process_context() { + let context = ProcessContext { + resource: None, + extra_attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue("checkout".to_string())), + }), + key_ref: 0, + }], + }; + + super::publish(&context).expect("couldn't publish the process context"); + let read_context = super::read().expect("couldn't read back the process context"); + unsafe { + super::unpublish().expect("couldn't unpublish the context"); + } + + assert!(read_context == context, "read back a different context"); } #[test] @@ -566,7 +653,7 @@ pub mod linux { .expect("couldn't publish the process context"); let header = read_process_context().expect("couldn't read back the process context"); - // Safety: the published context must have put valid bytes of size payload_size in the + // SAFETY: the published context must have put valid bytes of size payload_size in the // context if the signature check succeded. let read_payload = unsafe { std::slice::from_raw_parts(header.payload_ptr, header.payload_size as usize) @@ -595,7 +682,7 @@ pub mod linux { .expect("couldn't update the process context"); let header = read_process_context().expect("couldn't read back the process context"); - // Safety: the published context must have put valid bytes of size payload_size in the + // SAFETY: the published context must have put valid bytes of size payload_size in the // context if the signature check succeded. let read_payload = unsafe { std::slice::from_raw_parts(header.payload_ptr, header.payload_size as usize) @@ -616,7 +703,9 @@ pub mod linux { ); assert!(read_payload == payload_v2.as_bytes(), "payload mismatch"); - super::unpublish().expect("couldn't unpublish the context"); + unsafe { + super::unpublish().expect("couldn't unpublish the context"); + } } #[test] @@ -628,13 +717,16 @@ pub mod linux { .expect("couldn't publish the process context"); // The mapping must be discoverable right after publishing - find_otel_mapping().expect("couldn't find the otel mapping after publishing"); + super::ProcessContextSelfReader::find_otel_mapping() + .expect("couldn't find the otel mapping after publishing"); - super::unpublish().expect("couldn't unpublish the context"); + unsafe { + super::unpublish().expect("couldn't unpublish the context"); + } // After unpublishing the name must no longer appear in /proc/self/maps assert!( - find_otel_mapping().is_err(), + super::ProcessContextSelfReader::find_otel_mapping().is_err(), "otel mapping should not be visible after unpublish" ); } diff --git a/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs b/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs new file mode 100644 index 0000000000..edf4a23e61 --- /dev/null +++ b/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs @@ -0,0 +1,270 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + fs::File, + io::{self, BufRead, BufReader}, + ptr::{self, NonNull}, + sync::atomic::{fence, Ordering}, +}; + +#[cfg(debug_assertions)] +use std::sync::atomic::AtomicUsize; + +use libdd_trace_protobuf::opentelemetry::proto::common::v1::ProcessContext; +use prost::Message; + +use super::{MappingHeader, PROCESS_CTX_VERSION, SIGNATURE, UNPUBLISHED_OR_UPDATING}; + +#[cfg(debug_assertions)] +static LIVE_SELF_READERS: AtomicUsize = AtomicUsize::new(0); + +/// Reader for the current process's OTel process context mapping. +/// +/// Locates the OTEL_CTX mapping at construction. Call [`read`](Self::read) repeatedly to fetch +/// updated context data without re-parsing `/proc/self/maps`, as long as the process has not +/// forked. After a `fork()`, reads fail and a new reader must be constructed. +pub struct ProcessContextSelfReader { + pid: libc::pid_t, + header_ptr: NonNull, +} + +// SAFETY: ProcessContextSelfReader doesn't rely on thread local state and +// only references static memory -- owns nothing. +unsafe impl Send for ProcessContextSelfReader {} +// SAFETY: ProcessContextSelfReader doesn't modify anything +unsafe impl Sync for ProcessContextSelfReader {} + +impl ProcessContextSelfReader { + /// Locates the OTEL_CTX mapping in `/proc/self/maps`. + pub fn new() -> io::Result { + let mapping_addr = Self::find_otel_mapping()?; + // SAFETY: getpid() is always safe to call. + let pid = unsafe { libc::getpid() }; + let reader = Self { + pid, + header_ptr: Self::header_ptr_from_addr(mapping_addr)?, + }; + #[cfg(debug_assertions)] + LIVE_SELF_READERS.fetch_add(1, Ordering::Relaxed); + Ok(reader) + } + + /// Reads and decodes the current process's OTel process context. + pub fn read(&self) -> io::Result { + // SAFETY: getpid() is always safe to call. + let current_pid = unsafe { libc::getpid() }; + if current_pid != self.pid { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "process context reader is stale after fork; construct a new reader", + )); + } + + // SAFETY: `header_ptr` is non-null and points to our own process memory at an address + // we found in /proc/self/maps for `self.pid`. The mapping must be readable if it is + // listed as the OTel context. + let header = unsafe { self.header_ptr.as_ref() }; + + let published_at = header.monotonic_published_at_ns.load(Ordering::Acquire); + if published_at == UNPUBLISHED_OR_UPDATING { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context is currently being updated", + )); + } + + // `signature` and `version` are initialized before the release store that publishes + // a non-zero timestamp. If the acquire load above observed that timestamp, those + // writes are visible; if it observed `UNPUBLISHED_OR_UPDATING`, we returned before + // reading them. Updates never mutate these fields, so their accesses are race-free. + // The seqlock-controlled fields must be loaded atomically because they can change + // during an update. + let signature = header.signature; + let version = header.version; + + if signature != *SIGNATURE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid signature in process context mapping", + )); + } + if version != PROCESS_CTX_VERSION { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unsupported process context version {version}"), + )); + } + + let payload_size = header.payload_size.load(Ordering::Relaxed); + let payload_ptr = header.payload_ptr.load(Ordering::Relaxed).cast_const(); + + if payload_ptr.is_null() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "process context payload pointer is null", + )); + } + + let payload_bytes = + Self::read_process_memory(self.pid, payload_ptr, payload_size as usize)?; + + // pairs with the first release fence on update() to ensure that, if we read data + // updated after the initial published time, we at least see the published + // time being set to 0 in the next load of the published time (or we could + // see a later time rather than 0) + fence(Ordering::Acquire); + + let published_at_after = header.monotonic_published_at_ns.load(Ordering::Relaxed); + if published_at != published_at_after { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context changed while being read", + )); + } + + let context = ProcessContext::decode(payload_bytes.as_slice()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + + Ok(context) + } + + fn header_ptr_from_addr(mapping_addr: usize) -> io::Result> { + NonNull::new(ptr::with_exposed_provenance::(mapping_addr).cast_mut()) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "null process context header") + }) + } + + /// Find the OTEL_CTX mapping in /proc/self/maps. + pub(super) fn find_otel_mapping() -> io::Result { + let file = File::open("/proc/self/maps")?; + let reader = BufReader::new(file); + + for line in reader.lines() { + let line = line?; + + if Self::is_named_otel_mapping(&line) { + if let Some(addr) = Self::parse_mapping_start(&line) { + return Ok(addr); + } + } + } + + Err(io::Error::new( + io::ErrorKind::NotFound, + "couldn't find the mapping of the process context", + )) + } + + /// Parses the start address from a /proc/self/maps line. + fn parse_mapping_start(line: &str) -> Option { + usize::from_str_radix(line.split('-').next()?, 16).ok() + } + + /// Checks if a mapping line refers to the OTEL_CTX mapping. + fn is_named_otel_mapping(line: &str) -> bool { + let trimmed = line.trim_end(); + + // The mapping name is the `pathname` column documented for `/proc//maps`: + // https://github.com/torvalds/linux/blob/9147566d801602c9e7fc7f85e989735735bf38ba/Documentation/filesystems/proc.rst?plain=1#L384-L386 + // For the OTEL_CTX names we care about, it is the 6th whitespace-delimited field; + // `split_whitespace()` ignores the column padding. + let Some(name) = trimmed.split_whitespace().nth(5) else { + return false; + }; + + // The OTel process context spec says to search for entries whose names start with + // these prefixes. In `/proc//maps`, however, the optional ` (deleted)` suffix is + // emitted as a separate token, so the mapping-name token itself should match exactly. + name == "/memfd:OTEL_CTX" || name == "[anon_shmem:OTEL_CTX]" || name == "[anon:OTEL_CTX]" + } + + /// Reads `len` bytes from `addr` in the address space of `pid` via `process_vm_readv(2)`. + /// + /// Returns [`ErrorKind::WouldBlock`] for retryable races where the remote memory is no + /// longer mapped or only partially readable. The kernel reports the former as + /// [`libc::EFAULT`] from `pin_user_pages_remote()` and the latter as a short read (see + /// `process_vm_rw_core()` in `mm/process_vm_access.c`). + fn read_process_memory(pid: libc::pid_t, addr: *const u8, len: usize) -> io::Result> { + if len == 0 { + return Ok(Vec::new()); + } + + let mut buf = vec![0u8; len]; + let local_iov = libc::iovec { + iov_base: buf.as_mut_ptr().cast(), + iov_len: len, + }; + let remote_iov = libc::iovec { + iov_base: addr.cast_mut().cast(), + iov_len: len, + }; + + // SAFETY: `buf` and `addr` each span `len` bytes for the duration of the syscall. + let nbytes = unsafe { libc::process_vm_readv(pid, &local_iov, 1, &remote_iov, 1, 0) }; + + if nbytes < 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EFAULT) { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context payload was unmapped during read", + )); + } + return Err(io::Error::new( + err.kind(), + format!("failed to read process context payload: {err}"), + )); + } + + if nbytes as usize != len { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "incomplete read of process context payload", + )); + } + + Ok(buf) + } +} + +#[cfg(debug_assertions)] +impl Drop for ProcessContextSelfReader { + fn drop(&mut self) { + LIVE_SELF_READERS.fetch_sub(1, Ordering::Relaxed); + } +} + +#[cfg(debug_assertions)] +pub(super) fn live_reader_count() -> usize { + LIVE_SELF_READERS.load(Ordering::Relaxed) +} + +#[cfg(test)] +mod tests { + use super::ProcessContextSelfReader; + + #[test] + fn is_named_otel_mapping_matches_exact_mapping_name() { + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX" + )); + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX (deleted)" + )); + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon_shmem:OTEL_CTX]" + )); + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX]" + )); + + assert!(!ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX_BACKUP" + )); + assert!(!ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX_old]" + )); + } +}