From cfe279d31ee2139fac363af5fcffd3d5f4042871 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Thu, 25 Jun 2026 17:07:01 +0100 Subject: [PATCH 01/14] feat(library-config): otel process context reader Add `read()`, `threadlocal_attribute_key_map()`, and `read_threadlocal_attribute_key_map()` to `otel_process_ctx::linux`, along with the `find_otel_mapping()` / `is_named_otel_mapping()` / `parse_mapping_start()`. --- libdd-library-config/src/otel_process_ctx.rs | 230 ++++++++++++------- 1 file changed, 149 insertions(+), 81 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index b0f46caa97..978fdb519e 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -16,6 +16,8 @@ pub mod linux { use std::{ ffi::{c_void, CStr}, + fs::File, + io::{BufRead, BufReader}, mem::ManuallyDrop, os::fd::{AsRawFd, FromRawFd, OwnedFd}, ptr::{self, addr_of_mut}, @@ -28,7 +30,9 @@ pub mod linux { use anyhow::Context; - use libdd_trace_protobuf::opentelemetry::proto::common::v1::ProcessContext; + use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ + any_value, AnyValue, KeyValue, ProcessContext, + }; use prost::Message; /// Current version of the process context format @@ -393,6 +397,145 @@ pub mod linux { }) } + /// Parses the start address from a /proc/self/maps line. + fn parse_mapping_start(line: &str) -> Option { + usize::from_str_radix(line.split('-').next()?, 16).ok() + } + + /// Checks if a mapping line refers to the OTEL_CTX mapping. + fn is_named_otel_mapping(line: &str) -> bool { + let trimmed = line.trim_end(); + + // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') + // but `split_whitespace()` takes care of that. + let Some(name) = trimmed.split_whitespace().nth(5) else { + return false; + }; + + name.starts_with("/memfd:OTEL_CTX") + || name.starts_with("[anon_shmem:OTEL_CTX]") + || name.starts_with("[anon:OTEL_CTX]") + } + + /// Find the OTEL_CTX mapping in /proc/self/maps. + fn find_otel_mapping() -> anyhow::Result { + let file = File::open("/proc/self/maps")?; + let reader = BufReader::new(file); + + for line in reader.lines() { + let line = line?; + + if is_named_otel_mapping(&line) { + if let Some(addr) = parse_mapping_start(&line) { + return Ok(addr); + } + } + } + + Err(anyhow::anyhow!( + "couldn't find the mapping of the process context" + )) + } + + /// Reads and decodes the current process's OTel process context. + pub fn read() -> anyhow::Result { + let mapping_addr = find_otel_mapping()?; + let header: *mut MappingHeader = ptr::with_exposed_provenance_mut(mapping_addr); + + // Safety: we're reading from our own process memory at an address we found in + // /proc/self/maps. The mapping must be readable if it is listed as the OTel context. + let published_at = unsafe { + AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) + .load(Ordering::Relaxed) + }; + anyhow::ensure!( + published_at != 0, + "process context is currently being updated" + ); + fence(Ordering::SeqCst); + + // Safety: a non-zero published timestamp means the header is initialized. + let (signature, version, payload_size, payload_ptr) = unsafe { + anyhow::ensure!(!header.is_null(), "null process context header"); + ( + ptr::addr_of!((*header).signature).read_unaligned(), + ptr::addr_of!((*header).version).read_unaligned(), + ptr::addr_of!((*header).payload_size).read_unaligned(), + ptr::addr_of!((*header).payload_ptr).read_unaligned(), + ) + }; + + anyhow::ensure!( + signature == *SIGNATURE, + "invalid signature in process context mapping" + ); + anyhow::ensure!( + version == PROCESS_CTX_VERSION, + "unsupported process context version {version}" + ); + anyhow::ensure!( + !payload_ptr.is_null(), + "process context payload pointer is null" + ); + + // Safety: the publisher stores a pointer to `payload_size` initialized bytes and keeps that + // allocation alive until the next update. The timestamp check below detects concurrent + // updates and discards the read. + let payload = unsafe { std::slice::from_raw_parts(payload_ptr, payload_size as usize) }; + let context = ProcessContext::decode(payload)?; + + fence(Ordering::SeqCst); + let published_at_after = unsafe { + AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) + .load(Ordering::Relaxed) + }; + anyhow::ensure!( + published_at == published_at_after, + "process context changed while being read" + ); + + Ok(context) + } + + fn string_array(value: &AnyValue) -> Option> { + let any_value::Value::ArrayValue(array) = value.value.as_ref()? else { + return None; + }; + + array + .values + .iter() + .map(|value| match value.value.as_ref()? { + any_value::Value::StringValue(value) => Some(value.clone()), + _ => None, + }) + .collect() + } + + fn find_attr<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a AnyValue> { + attrs + .iter() + .find(|attr| attr.key == key) + .and_then(|attr| attr.value.as_ref()) + } + + /// Returns the thread-local attribute key map from a decoded process context. + pub fn threadlocal_attribute_key_map(context: &ProcessContext) -> Option> { + let key = "threadlocal.attribute_key_map"; + + context + .resource + .as_ref() + .and_then(|resource| find_attr(&resource.attributes, key)) + .or_else(|| find_attr(&context.extra_attributes, key)) + .and_then(string_array) + } + + /// Reads the current process context and returns its thread-local attribute key map. + pub fn read_threadlocal_attribute_key_map() -> anyhow::Result>> { + Ok(threadlocal_attribute_key_map(&read()?)) + } + /// Publishes or updates the process context for it to be visible by external readers. /// /// If any of the following condition holds: @@ -465,81 +608,6 @@ pub mod linux { #[serial_test::serial] mod tests { use super::MappingHeader; - use anyhow::ensure; - use std::{ - fs::File, - io::{BufRead, BufReader}, - ptr::{self, addr_of_mut}, - sync::atomic::{fence, AtomicU64, Ordering}, - }; - - /// Parses the start address from a /proc/self/maps line - fn parse_mapping_start(line: &str) -> Option { - usize::from_str_radix(line.split('-').next()?, 16).ok() - } - - /// Checks if a mapping line refers to the OTEL_CTX mapping. - fn is_named_otel_mapping(line: &str) -> bool { - let trimmed = line.trim_end(); - - // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') - // but `split_whitespace()` takes care of that. - let Some(name) = trimmed.split_whitespace().nth(5) else { - return false; - }; - - name.starts_with("/memfd:OTEL_CTX") - || name.starts_with("[anon_shmem:OTEL_CTX]") - || name.starts_with("[anon:OTEL_CTX]") - } - - /// Establishes proper synchronization/memory ordering with the writer, checking that - /// `monotonic_published_at` is not zero and that the signature is correct. Returns a - /// pointer to the initialized header in case of success. - fn verify_mapping_at(addr: usize) -> anyhow::Result<*const MappingHeader> { - let header: *mut MappingHeader = ptr::with_exposed_provenance_mut(addr); - // Safety: we're reading from our own process memory at an address we found in - // /proc/self/maps. This should be safe as long as the mapping exists and has read - // permissions. - // - // For the alignment constraint of `AtomicU64`, see [^atomic-u64-alignment]. - let published_at = unsafe { - AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) - .load(Ordering::Relaxed) - }; - ensure!(published_at != 0, "monotonic_published_at_ns is zero: couldn't read an initialized header in the candidate mapping"); - fence(Ordering::SeqCst); - - // Safety: if `monotonic_published_at_ns` is non-zero, the header is properly - // initialized and thus readable. - let signature = unsafe { &header.as_ref().unwrap().signature }; - ensure!( - signature == super::SIGNATURE, - "invalid signature in the candidate mapping" - ); - - Ok(header) - } - - /// Find the OTEL_CTX mapping in /proc/self/maps - fn find_otel_mapping() -> anyhow::Result { - let file = File::open("/proc/self/maps")?; - let reader = BufReader::new(file); - - for line in reader.lines() { - let line = line?; - - if is_named_otel_mapping(&line) { - if let Some(addr) = parse_mapping_start(&line) { - return Ok(addr); - } - } - } - - Err(anyhow::anyhow!( - "couldn't find the mapping of the process context" - )) - } /// Read the process context from the current process. /// @@ -550,9 +618,9 @@ pub mod linux { /// concurrent writers after reading the header, because we know they can't be). Do not /// extract or use as it is as a generic Rust OTel process context reader. fn read_process_context() -> anyhow::Result { - let mapping_addr = find_otel_mapping()?; - let header_ptr = verify_mapping_at(mapping_addr)?; - // Safety: the pointer returned by `verify_mapping_at` points to an initialized header + let mapping_addr = super::find_otel_mapping()?; + let header_ptr: *const MappingHeader = std::ptr::with_exposed_provenance(mapping_addr); + // Safety: the mapping was published by this test before being read. Ok(unsafe { std::ptr::read(header_ptr) }) } @@ -628,13 +696,13 @@ pub mod linux { .expect("couldn't publish the process context"); // The mapping must be discoverable right after publishing - find_otel_mapping().expect("couldn't find the otel mapping after publishing"); + super::find_otel_mapping().expect("couldn't find the otel mapping after publishing"); super::unpublish().expect("couldn't unpublish the context"); // After unpublishing the name must no longer appear in /proc/self/maps assert!( - find_otel_mapping().is_err(), + super::find_otel_mapping().is_err(), "otel mapping should not be visible after unpublish" ); } From adc1fa206f91b0d30636f8ab83a8b54e8918e9f1 Mon Sep 17 00:00:00 2001 From: Levi Morrison Date: Mon, 29 Jun 2026 23:35:00 -0600 Subject: [PATCH 02/14] refactor(library-config)!: removed packed from MappingHeader, anyhow::Result -> io::Result --- libdd-library-config/src/otel_process_ctx.rs | 241 ++++++++++--------- 1 file changed, 125 insertions(+), 116 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index 978fdb519e..be1d12368f 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -17,10 +17,10 @@ pub mod linux { use std::{ ffi::{c_void, CStr}, fs::File, - io::{BufRead, BufReader}, + io::{self, BufRead, BufReader}, mem::ManuallyDrop, os::fd::{AsRawFd, FromRawFd, OwnedFd}, - ptr::{self, addr_of_mut}, + ptr, sync::{ atomic::{fence, AtomicU64, Ordering}, Mutex, MutexGuard, @@ -28,8 +28,6 @@ pub mod linux { time::Duration, }; - use anyhow::Context; - use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ any_value, AnyValue, KeyValue, ProcessContext, }; @@ -51,17 +49,16 @@ pub mod linux { /// based synchronization requires the use of atomics to have any effect (see [Mandatory /// atomic](https://doc.rust-lang.org/std/sync/atomic/fn.fence.html#mandatory-atomic)) /// - /// We use `monotonic_published_at_ns` for synchronization with the reader. Ideally, it should - /// be an `AtomicU64`, but this is incompatible with `#[repr(C, packed)]` by default, as it - /// could be misaligned. In our case, given the page size and the layout of `MappingHeader`, it - /// is actually 8-bytes aligned: we use [`AtomicU64::from_ptr`] to create an atomic view when - /// synchronization is needed. - #[repr(C, packed)] + /// We use `monotonic_published_at_ns` for synchronization with the reader. `AtomicU64` has the + /// same in-memory representation as `u64` and is 8-bytes aligned. The field lands at offset 16 + /// in the struct (after 8 bytes of signature + 4 bytes version + 4 bytes payload_size), which + /// satisfies that alignment on any page-aligned base address. + #[repr(C)] struct MappingHeader { signature: [u8; 8], version: u32, payload_size: u32, - monotonic_published_at_ns: u64, + monotonic_published_at_ns: AtomicU64, payload_ptr: *const u8, } @@ -79,7 +76,7 @@ pub mod linux { start_addr: *mut c_void, } - // Safety: MemMapping represents ownership over the mapped region. It never leaks or + // SAFETY: MemMapping represents ownership over the mapped region. It never leaks or // share the internal pointer. It's also safe to drop (`munmap`) from a different thread. unsafe impl Send for MemMapping {} @@ -95,20 +92,20 @@ pub mod linux { /// /// `memfd` is the preferred method, but this function fallbacks to an anonymous mapping if /// `memfd` failed for any reason. - fn new() -> anyhow::Result { + fn new() -> io::Result { let size = mapping_size(); try_memfd(MAPPING_NAME, libc::MFD_CLOEXEC | libc::MFD_NOEXEC_SEAL | libc::MFD_ALLOW_SEALING) .or_else(|_| try_memfd(MAPPING_NAME, libc::MFD_CLOEXEC | libc::MFD_ALLOW_SEALING)) .and_then(|fd| { - // Safety: fd is a valid open file descriptor. + // SAFETY: fd is a valid open file descriptor. check_syscall_retval( unsafe { libc::ftruncate(fd.as_raw_fd(), mapping_size() as libc::off_t) }, "ftruncate failed" )?; - // Safety: we pass a null pointer to mmap which is unconditionally ok + // SAFETY: we pass a null pointer to mmap which is unconditionally ok let start_addr = check_mapping_addr( unsafe { libc::mmap( @@ -128,7 +125,7 @@ pub mod linux { }) // If any previous step failed, we fallback to an anonymous mapping .or_else(|_| { - // Safety: we pass a null pointer to mmap, no precondition to uphold + // SAFETY: we pass a null pointer to mmap, no precondition to uphold let start_addr = check_mapping_addr( unsafe { libc::mmap( @@ -148,8 +145,8 @@ pub mod linux { } /// Makes this mapping discoverable by giving it a name. - fn set_name(&mut self) -> anyhow::Result<()> { - // Safety: self.start_addr is valid for mapping_size() bytes as per MemMapping + fn set_name(&mut self) -> io::Result<()> { + // SAFETY: self.start_addr is valid for mapping_size() bytes as per MemMapping // invariants. name is a valid NUL-terminated string that outlives the prctl call. check_syscall_retval( unsafe { @@ -169,8 +166,8 @@ pub mod linux { /// Unmaps the underlying memory region. This has same effect as dropping `self`, but /// propagates potential errors. - fn free(mut self) -> anyhow::Result<()> { - // Safety: We put `self` in a `ManuallyDrop`, which prevents drop and future calls to + fn free(mut self) -> io::Result<()> { + // SAFETY: We put `self` in a `ManuallyDrop`, which prevents drop and future calls to // `free()`. unsafe { self.unmap()?; @@ -191,9 +188,9 @@ pub mod linux { /// /// Practically, `self` must be put in a `ManuallyDrop` wrapper and forgotten, or being in /// the process of being dropped. - unsafe fn unmap(&mut self) -> anyhow::Result<()> { + unsafe fn unmap(&mut self) -> io::Result<()> { check_syscall_retval( - // Safety: upheld by the caller. + // SAFETY: upheld by the caller. unsafe { libc::munmap(self.start_addr, mapping_size()) }, "munmap failed when freeing the process context", )?; @@ -204,7 +201,7 @@ pub mod linux { impl Drop for MemMapping { fn drop(&mut self) { - // Safety: `self` is being dropped + // SAFETY: `self` is being dropped let _ = unsafe { self.unmap() }; } } @@ -224,27 +221,28 @@ pub mod linux { impl ProcessContextHandle { /// Initial publication of the process context. Creates an appropriate memory mapping. - fn publish(payload: Vec) -> anyhow::Result { + fn publish(payload: Vec) -> io::Result { let mut mapping = MemMapping::new()?; let size = mapping_size(); check_syscall_retval( - // Safety: the invariants of MemMapping ensures `start_addr` is not null and comes + // SAFETY: the invariants of MemMapping ensures `start_addr` is not null and comes // from a previous call to `mmap` unsafe { libc::madvise(mapping.start_addr, size, libc::MADV_DONTFORK) }, "madvise MADVISE_DONTFORK failed", )?; let published_at_ns = since_boottime_ns().ok_or_else(|| { - anyhow::anyhow!("failed to get current time for process context publication") + io::Error::other("failed to get current time for process context publication") })?; let header = mapping.start_addr as *mut MappingHeader; unsafe { - // Safety: MappingHeader is packed, thus have no alignment requirement. It points - // to a freshly mmaped region which is valid for writing at least `mapping_size()`, - // which we make sure is greater than the size of MappingHeader. + // SAFETY: header points to a freshly mmaped region valid for at least + // `mapping_size()` bytes, which we ensure is >= size_of::(). The + // base address is page-aligned, so all fields including `monotonic_published_at_ns` + // (at offset 16) satisfy their alignment requirements. ptr::write( header, MappingHeader { @@ -253,9 +251,9 @@ pub mod linux { payload_size: payload .len() .try_into() - .context("payload size overflowed")?, + .map_err(|_| io::Error::other("payload size overflowed"))?, // will be set atomically at last - monotonic_published_at_ns: 0, + monotonic_published_at_ns: AtomicU64::new(0), payload_ptr: payload.as_ptr(), }, ); @@ -266,7 +264,8 @@ pub mod linux { // To do so, we implement synchronization during publication _as if the reader were // another thread of this program_, using atomics and fences. fence(Ordering::SeqCst); - AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) + (*header) + .monotonic_published_at_ns .store(published_at_ns, Ordering::Relaxed); } @@ -279,30 +278,24 @@ pub mod linux { Ok(ProcessContextHandle { mapping, payload, - // Safety: getpid() is always safe to call. + // SAFETY: getpid() is always safe to call. pid: unsafe { libc::getpid() }, }) } /// Updates the context after initial publication. - fn update(&mut self, payload: Vec) -> anyhow::Result<()> { + fn update(&mut self, payload: Vec) -> io::Result<()> { let header = self.mapping.start_addr as *mut MappingHeader; let monotonic_published_at_ns = since_boottime_ns() - .ok_or_else(|| anyhow::anyhow!("could not get the current timestamp"))?; + .ok_or_else(|| io::Error::other("could not get the current timestamp"))?; let payload_size = payload.len().try_into().map_err(|_| { - anyhow::anyhow!("couldn't update process context: new payload too large") + io::Error::other("couldn't update process context: new payload too large") })?; - // Safety: - // - // [^atomic-u64-alignment]: Page size is at minimum 4KB and will be always 8 bytes - // aligned even on exotic platforms. The offset `monotonic_published_at_ns` is 16 - // bytes, so it's 8-bytes aligned (`AtomicU64` has both a size and align of 8 bytes). - // - // The header memory is valid for both read and writes. - let published_at_atomic = - unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) }; + // SAFETY: the mapping is live and valid; the header pointer is page-aligned which + // satisfies AtomicU64's alignment. + let published_at_atomic = unsafe { &(*header).monotonic_published_at_ns }; // A process shouldn't try to concurrently update its own context // @@ -310,15 +303,15 @@ pub mod linux { // this would effectively "lock" any future publishing. Move throwing code above this // swap, or properly restore the previous value if the former can't be done. if published_at_atomic.swap(0, Ordering::Relaxed) == 0 { - return Err(anyhow::anyhow!( - "concurrent update of the process context is not supported" + return Err(io::Error::other( + "concurrent update of the process context is not supported", )); } fence(Ordering::SeqCst); self.payload = payload; - // Safety: we own the mapping, which is live and valid for writes. The header is packed + // SAFETY: we own the mapping, which is live and valid for writes. The header is packed // and thus has no alignment constraints. unsafe { (*header).payload_ptr = self.payload.as_ptr(); @@ -334,9 +327,10 @@ pub mod linux { /// Returns `Err` wrapping the current `errno` with `msg` as context if `addr` equals /// `MAP_FAILED`, `Ok(addr)` otherwise. - fn check_mapping_addr(addr: *mut c_void, msg: &'static str) -> anyhow::Result<*mut c_void> { + fn check_mapping_addr(addr: *mut c_void, msg: &'static str) -> io::Result<*mut c_void> { if addr == libc::MAP_FAILED { - Err(std::io::Error::last_os_error()).context(msg) + let e = io::Error::last_os_error(); + Err(io::Error::new(e.kind(), format!("{msg}: {e}"))) } else { Ok(addr) } @@ -344,27 +338,28 @@ pub mod linux { /// Returns `Err` wrapping the current `errno` with `msg` as context if `ret` is negative, /// `Ok(ret)` otherwise. - fn check_syscall_retval(ret: libc::c_int, msg: &'static str) -> anyhow::Result { + fn check_syscall_retval(ret: libc::c_int, msg: &'static str) -> io::Result { if ret < 0 { - Err(std::io::Error::last_os_error()).context(msg) + let e = io::Error::last_os_error(); + Err(io::Error::new(e.kind(), format!("{msg}: {e}"))) } else { Ok(ret) } } /// Creates a `memfd` file descriptor with the given name and flags. - fn try_memfd(name: &CStr, flags: libc::c_uint) -> anyhow::Result { + fn try_memfd(name: &CStr, flags: libc::c_uint) -> io::Result { // We use the raw syscall rather than `libc::memfd_create` because the latter requires // glibc >= 2.27, while `syscall()` + `SYS_memfd_create` works with any glibc version. check_syscall_retval( - // Safety: name is a valid NUL-terminated string; flags are constant bit flags. + // SAFETY: name is a valid NUL-terminated string; flags are constant bit flags. unsafe { libc::syscall(libc::SYS_memfd_create, name.as_ptr(), flags as libc::c_long) as libc::c_int }, "memfd_create failed", ) - // Safety: fd is a valid file descriptor just returned by memfd_create. + // SAFETY: fd is a valid file descriptor just returned by memfd_create. .map(|fd| unsafe { OwnedFd::from_raw_fd(fd) }) } @@ -379,7 +374,7 @@ pub mod linux { tv_sec: 0, tv_nsec: 0, }; - // Safety: ts is a valid, writable timespec. + // SAFETY: ts is a valid, writable timespec. let ret = unsafe { libc::clock_gettime(libc::CLOCK_BOOTTIME, &mut ts) }; if ret != 0 { return None; @@ -391,9 +386,9 @@ pub mod linux { } /// Locks the context handle. Returns a uniform error if the lock has been poisoned. - fn lock_context_handle() -> anyhow::Result>> { + fn lock_context_handle() -> io::Result>> { PROCESS_CONTEXT_HANDLER.lock().map_err(|_| { - anyhow::anyhow!("a thread panicked while operating on the process context handler") + io::Error::other("a thread panicked while operating on the process context handler") }) } @@ -418,7 +413,7 @@ pub mod linux { } /// Find the OTEL_CTX mapping in /proc/self/maps. - fn find_otel_mapping() -> anyhow::Result { + fn find_otel_mapping() -> io::Result { let file = File::open("/proc/self/maps")?; let reader = BufReader::new(file); @@ -432,67 +427,79 @@ pub mod linux { } } - Err(anyhow::anyhow!( - "couldn't find the mapping of the process context" + Err(io::Error::new( + io::ErrorKind::NotFound, + "couldn't find the mapping of the OTel process context", )) } /// Reads and decodes the current process's OTel process context. - pub fn read() -> anyhow::Result { + pub fn read() -> io::Result { let mapping_addr = find_otel_mapping()?; let header: *mut MappingHeader = ptr::with_exposed_provenance_mut(mapping_addr); - // Safety: we're reading from our own process memory at an address we found in + // SAFETY: we're reading from our own process memory at an address we found in // /proc/self/maps. The mapping must be readable if it is listed as the OTel context. - let published_at = unsafe { - AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) - .load(Ordering::Relaxed) - }; - anyhow::ensure!( - published_at != 0, - "process context is currently being updated" - ); + let published_at = unsafe { (*header).monotonic_published_at_ns.load(Ordering::Relaxed) }; + if published_at == 0 { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context is currently being updated", + )); + } fence(Ordering::SeqCst); - // Safety: a non-zero published timestamp means the header is initialized. - let (signature, version, payload_size, payload_ptr) = unsafe { - anyhow::ensure!(!header.is_null(), "null process context header"); - ( - ptr::addr_of!((*header).signature).read_unaligned(), - ptr::addr_of!((*header).version).read_unaligned(), - ptr::addr_of!((*header).payload_size).read_unaligned(), - ptr::addr_of!((*header).payload_ptr).read_unaligned(), - ) - }; + let (signature, version, payload_size, payload_ptr) = + // SAFETY: a non-zero published timestamp means the header is initialized. + if let Some(header) = unsafe { header.as_ref() } { + ( + header.signature, + header.version, + header.payload_size, + header.payload_ptr, + ) + } else { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "null process context header", + )); + }; - anyhow::ensure!( - signature == *SIGNATURE, - "invalid signature in process context mapping" - ); - anyhow::ensure!( - version == PROCESS_CTX_VERSION, - "unsupported process context version {version}" - ); - anyhow::ensure!( - !payload_ptr.is_null(), - "process context payload pointer is null" - ); - - // Safety: the publisher stores a pointer to `payload_size` initialized bytes and keeps that + if signature != *SIGNATURE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid signature in process context mapping", + )); + } + if version != PROCESS_CTX_VERSION { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unsupported process context version {version}"), + )); + } + if payload_ptr.is_null() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "process context payload pointer is null", + )); + } + + // SAFETY: the publisher stores a pointer to `payload_size` initialized bytes and keeps that // allocation alive until the next update. The timestamp check below detects concurrent // updates and discards the read. let payload = unsafe { std::slice::from_raw_parts(payload_ptr, payload_size as usize) }; - let context = ProcessContext::decode(payload)?; + let context = ProcessContext::decode(payload) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; fence(Ordering::SeqCst); - let published_at_after = unsafe { - AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) - .load(Ordering::Relaxed) - }; - anyhow::ensure!( - published_at == published_at_after, - "process context changed while being read" - ); + let published_at_after = + unsafe { (*header).monotonic_published_at_ns.load(Ordering::Relaxed) }; + if published_at != published_at_after { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context changed while being read", + )); + } Ok(context) } @@ -532,7 +539,7 @@ pub mod linux { } /// Reads the current process context and returns its thread-local attribute key map. - pub fn read_threadlocal_attribute_key_map() -> anyhow::Result>> { + pub fn read_threadlocal_attribute_key_map() -> io::Result>> { Ok(threadlocal_attribute_key_map(&read()?)) } @@ -559,14 +566,14 @@ pub mod linux { /// Ruby) that doesn't follow with an immediate `exec` is already "taking that risk", so to /// speak (typically, if no thread is ever spawned before the fork, things are mostly fine). #[inline] - pub fn publish(context: &ProcessContext) -> anyhow::Result<()> { + pub fn publish(context: &ProcessContext) -> io::Result<()> { publish_raw_payload(context.encode_to_vec()) } - fn publish_raw_payload(payload: Vec) -> anyhow::Result<()> { + fn publish_raw_payload(payload: Vec) -> io::Result<()> { let mut guard = lock_context_handle()?; - // Safety: getpid() is always safe to call. + // SAFETY: getpid() is always safe to call. match &mut *guard { Some(handler) if handler.pid == unsafe { libc::getpid() } => handler.update(payload), Some(handler) => { @@ -594,7 +601,7 @@ pub mod linux { /// this is no-op. /// /// A call to [publish] following an [unpublish] will create a new mapping. - pub fn unpublish() -> anyhow::Result<()> { + pub fn unpublish() -> io::Result<()> { let mut guard = lock_context_handle()?; if let Some(ProcessContextHandle { mapping, .. }) = guard.take() { @@ -607,6 +614,8 @@ pub mod linux { #[cfg(test)] #[serial_test::serial] mod tests { + use std::sync::atomic::Ordering; + use super::MappingHeader; /// Read the process context from the current process. @@ -617,10 +626,10 @@ pub mod linux { /// functions it relies on, are specialized for tests (for example, it doesn't check for /// concurrent writers after reading the header, because we know they can't be). Do not /// extract or use as it is as a generic Rust OTel process context reader. - fn read_process_context() -> anyhow::Result { + fn read_process_context() -> io::Result { let mapping_addr = super::find_otel_mapping()?; let header_ptr: *const MappingHeader = std::ptr::with_exposed_provenance(mapping_addr); - // Safety: the mapping was published by this test before being read. + // SAFETY: the mapping was published by this test before being read. Ok(unsafe { std::ptr::read(header_ptr) }) } @@ -634,7 +643,7 @@ pub mod linux { .expect("couldn't publish the process context"); let header = read_process_context().expect("couldn't read back the process context"); - // Safety: the published context must have put valid bytes of size payload_size in the + // SAFETY: the published context must have put valid bytes of size payload_size in the // context if the signature check succeded. let read_payload = unsafe { std::slice::from_raw_parts(header.payload_ptr, header.payload_size as usize) @@ -650,12 +659,12 @@ pub mod linux { "wrong payload size" ); assert!( - header.monotonic_published_at_ns > 0, + header.monotonic_published_at_ns.load(Ordering::Relaxed) > 0, "monotonic_published_at_ns is zero" ); assert!(read_payload == payload_v1.as_bytes(), "payload mismatch"); - let published_at_ns_v1 = header.monotonic_published_at_ns; + let published_at_ns_v1 = header.monotonic_published_at_ns.load(Ordering::Relaxed); // Ensure the clock advances so the updated timestamp is strictly greater std::thread::sleep(std::time::Duration::from_nanos(10)); @@ -663,7 +672,7 @@ pub mod linux { .expect("couldn't update the process context"); let header = read_process_context().expect("couldn't read back the process context"); - // Safety: the published context must have put valid bytes of size payload_size in the + // SAFETY: the published context must have put valid bytes of size payload_size in the // context if the signature check succeded. let read_payload = unsafe { std::slice::from_raw_parts(header.payload_ptr, header.payload_size as usize) @@ -679,7 +688,7 @@ pub mod linux { "wrong payload size" ); assert!( - header.monotonic_published_at_ns > published_at_ns_v1, + header.monotonic_published_at_ns.load(Ordering::Relaxed) > published_at_ns_v1, "published_at_ns should be strictly greater after update" ); assert!(read_payload == payload_v2.as_bytes(), "payload mismatch"); From 67cc62f4892e2b734a6f5ba808c7756fb138ac94 Mon Sep 17 00:00:00 2001 From: Levi Morrison Date: Tue, 30 Jun 2026 14:39:36 -0600 Subject: [PATCH 03/14] test(library-config): compile-time layout assertions for MappingHeader Co-Authored-By: Claude Sonnet 4.6 --- libdd-library-config/src/otel_process_ctx.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index be1d12368f..a4a9206a8e 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -62,6 +62,19 @@ pub mod linux { payload_ptr: *const u8, } + // Compile-time verification that MappingHeader matches the field offsets and total size + // mandated by the OTel process context spec: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/oteps/profiles/4719-process-ctx.md + const _: () = { + use std::mem::{offset_of, size_of}; + assert!(offset_of!(MappingHeader, signature) == 0); + assert!(offset_of!(MappingHeader, version) == 8); + assert!(offset_of!(MappingHeader, payload_size) == 12); + assert!(offset_of!(MappingHeader, monotonic_published_at_ns) == 16); + assert!(offset_of!(MappingHeader, payload_ptr) == 24); + assert!(size_of::() == 32); + }; + /// The shared memory mapped area to publish the context to. The memory region is owned by a /// [MemMapping] instance and is automatically unmapped upon drop. /// From d363e03a90531fb4d414f72b06814b07eb75f26a Mon Sep 17 00:00:00 2001 From: Levi Morrison Date: Tue, 30 Jun 2026 16:48:32 -0600 Subject: [PATCH 04/14] fix(library-config): import std::io in tests --- libdd-library-config/src/otel_process_ctx.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index a4a9206a8e..4ac24ca262 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -627,6 +627,7 @@ pub mod linux { #[cfg(test)] #[serial_test::serial] mod tests { + use std::io; use std::sync::atomic::Ordering; use super::MappingHeader; From 31b32fe662fee8f5f90033a0b5eb0c3e8a5397f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Thu, 2 Jul 2026 11:47:07 +0100 Subject: [PATCH 05/14] fix(library-config): make OTel process context reads race-safe --- libdd-library-config/src/otel_process_ctx.rs | 285 +++++++++++++------ 1 file changed, 191 insertions(+), 94 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index 4ac24ca262..ab0155a018 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -3,13 +3,6 @@ //! Implementation of the publisher part of the [OTEL process //! context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719) -//! -//! # A note on race conditions -//! -//! Process context sharing implies concurrently writing to a memory area that another process -//! might be actively reading. However, reading isn't done as direct memory accesses but goes -//! through the OS, so the Rust definition of race conditions doesn't really apply. We also use -//! atomics and fences, see MappingHeader's documentation. #[cfg(target_os = "linux")] #[cfg(target_has_atomic = "64")] @@ -17,12 +10,12 @@ pub mod linux { use std::{ ffi::{c_void, CStr}, fs::File, - io::{self, BufRead, BufReader}, - mem::ManuallyDrop, + io::{self, BufRead, BufReader, ErrorKind}, + mem::{size_of, ManuallyDrop}, os::fd::{AsRawFd, FromRawFd, OwnedFd}, - ptr, + ptr::{self, NonNull}, sync::{ - atomic::{fence, AtomicU64, Ordering}, + atomic::{fence, AtomicPtr, AtomicU32, AtomicU64, Ordering}, Mutex, MutexGuard, }, time::Duration, @@ -50,16 +43,18 @@ pub mod linux { /// atomic](https://doc.rust-lang.org/std/sync/atomic/fn.fence.html#mandatory-atomic)) /// /// We use `monotonic_published_at_ns` for synchronization with the reader. `AtomicU64` has the - /// same in-memory representation as `u64` and is 8-bytes aligned. The field lands at offset 16 - /// in the struct (after 8 bytes of signature + 4 bytes version + 4 bytes payload_size), which - /// satisfies that alignment on any page-aligned base address. + /// same in-memory representation as `u64` and is 8-bytes aligned. `payload_size` and + /// `payload_ptr` are atomic too because they are modified while readers may be sampling the + /// header. The timestamp field lands at offset 16 in the struct (after 8 bytes of signature + + /// 4 bytes version + 4 bytes payload_size), which satisfies that alignment on any + /// page-aligned base address. #[repr(C)] struct MappingHeader { signature: [u8; 8], version: u32, - payload_size: u32, + payload_size: AtomicU32, monotonic_published_at_ns: AtomicU64, - payload_ptr: *const u8, + payload_ptr: AtomicPtr, } // Compile-time verification that MappingHeader matches the field offsets and total size @@ -73,6 +68,9 @@ pub mod linux { assert!(offset_of!(MappingHeader, monotonic_published_at_ns) == 16); assert!(offset_of!(MappingHeader, payload_ptr) == 24); assert!(size_of::() == 32); + assert!(core::mem::align_of::() == 8); + assert!(core::mem::align_of::() == core::mem::align_of::()); + assert!(core::mem::align_of::>() == core::mem::align_of::<*mut u8>()); }; /// The shared memory mapped area to publish the context to. The memory region is owned by a @@ -86,7 +84,7 @@ pub mod linux { /// - once `self` has been dropped, no memory access must be performed on the memory previously /// pointed to by `start`. struct MemMapping { - start_addr: *mut c_void, + start_addr: NonNull, } // SAFETY: MemMapping represents ownership over the mapped region. It never leaks or @@ -105,6 +103,8 @@ pub mod linux { /// /// `memfd` is the preferred method, but this function fallbacks to an anonymous mapping if /// `memfd` failed for any reason. + /// + /// The memory is guaranteed to initialized to zeroes. fn new() -> io::Result { let size = mapping_size(); @@ -166,7 +166,7 @@ pub mod linux { libc::prctl( libc::PR_SET_VMA, libc::PR_SET_VMA_ANON_NAME as libc::c_ulong, - self.start_addr as libc::c_ulong, + self.start_addr.as_ptr() as libc::c_ulong, mapping_size() as libc::c_ulong, MAPPING_NAME.as_ptr() as libc::c_ulong, ) @@ -204,7 +204,7 @@ pub mod linux { unsafe fn unmap(&mut self) -> io::Result<()> { check_syscall_retval( // SAFETY: upheld by the caller. - unsafe { libc::munmap(self.start_addr, mapping_size()) }, + unsafe { libc::munmap(self.start_addr.as_ptr(), mapping_size()) }, "munmap failed when freeing the process context", )?; @@ -223,7 +223,7 @@ pub mod linux { struct ProcessContextHandle { mapping: MemMapping, /// Once published, and until the next update is complete, the backing allocation of - /// `payload` might be read by external processes and thus most not move (e.g. by resizing + /// `payload` might be read by external processes and thus must not move (e.g. by resizing /// or drop). #[allow(unused)] payload: Vec, @@ -237,11 +237,10 @@ pub mod linux { fn publish(payload: Vec) -> io::Result { let mut mapping = MemMapping::new()?; let size = mapping_size(); - check_syscall_retval( // SAFETY: the invariants of MemMapping ensures `start_addr` is not null and comes // from a previous call to `mmap` - unsafe { libc::madvise(mapping.start_addr, size, libc::MADV_DONTFORK) }, + unsafe { libc::madvise(mapping.start_addr.as_ptr(), size, libc::MADV_DONTFORK) }, "madvise MADVISE_DONTFORK failed", )?; @@ -249,7 +248,7 @@ pub mod linux { io::Error::other("failed to get current time for process context publication") })?; - let header = mapping.start_addr as *mut MappingHeader; + let header = mapping.start_addr.as_ptr() as *mut MappingHeader; unsafe { // SAFETY: header points to a freshly mmaped region valid for at least @@ -261,25 +260,20 @@ pub mod linux { MappingHeader { signature: *SIGNATURE, version: PROCESS_CTX_VERSION, - payload_size: payload - .len() - .try_into() - .map_err(|_| io::Error::other("payload size overflowed"))?, + payload_size: AtomicU32::new( + payload + .len() + .try_into() + .map_err(|_| io::Error::other("payload size overflowed"))?, + ), // will be set atomically at last monotonic_published_at_ns: AtomicU64::new(0), - payload_ptr: payload.as_ptr(), + payload_ptr: AtomicPtr::new(payload.as_ptr().cast_mut()), }, ); - // We typically want to avoid the compiler and the hardware to re-order the write - // to the `monotonic_published_at_ns` (which should be last according to the - // specification) with the writes to other fields of the header. - // - // To do so, we implement synchronization during publication _as if the reader were - // another thread of this program_, using atomics and fences. - fence(Ordering::SeqCst); (*header) .monotonic_published_at_ns - .store(published_at_ns, Ordering::Relaxed); + .store(published_at_ns, Ordering::Release); } // Note that naming must be unconditionally attempted, even on kernels where we might @@ -298,41 +292,46 @@ pub mod linux { /// Updates the context after initial publication. fn update(&mut self, payload: Vec) -> io::Result<()> { - let header = self.mapping.start_addr as *mut MappingHeader; + let header = self.mapping.start_addr.as_ptr() as *mut MappingHeader; let monotonic_published_at_ns = since_boottime_ns() .ok_or_else(|| io::Error::other("could not get the current timestamp"))?; let payload_size = payload.len().try_into().map_err(|_| { io::Error::other("couldn't update process context: new payload too large") })?; - - // SAFETY: the mapping is live and valid; the header pointer is page-aligned which - // satisfies AtomicU64's alignment. - let published_at_atomic = unsafe { &(*header).monotonic_published_at_ns }; - // A process shouldn't try to concurrently update its own context // // Note: be careful of early return while `monotonic_published_at` is still zero, as // this would effectively "lock" any future publishing. Move throwing code above this // swap, or properly restore the previous value if the former can't be done. - if published_at_atomic.swap(0, Ordering::Relaxed) == 0 { + // SAFETY: the mapping is live and valid; the timestamp field is atomic and aligned. + let published_at_atomic = unsafe { &(*header).monotonic_published_at_ns }; + let previous_published_at_ns = published_at_atomic.swap(0, Ordering::Acquire); + if previous_published_at_ns == 0 { return Err(io::Error::other( "concurrent update of the process context is not supported", )); } - fence(Ordering::SeqCst); + // The timestamp also acts as the seqlock version, so it must advance even if the + // clock source returns the same value for two rapid updates. + let monotonic_published_at_ns = + monotonic_published_at_ns.max(previous_published_at_ns.saturating_add(1)); + + fence(Ordering::Release); self.payload = payload; - // SAFETY: we own the mapping, which is live and valid for writes. The header is packed - // and thus has no alignment constraints. + // SAFETY: the mapping is live and valid; the changeable fields are atomic and aligned. unsafe { - (*header).payload_ptr = self.payload.as_ptr(); - (*header).payload_size = payload_size; + (*header) + .payload_ptr + .store(self.payload.as_ptr().cast_mut(), Ordering::Relaxed); + (*header) + .payload_size + .store(payload_size, Ordering::Relaxed); } - fence(Ordering::SeqCst); - published_at_atomic.store(monotonic_published_at_ns, Ordering::Relaxed); + published_at_atomic.store(monotonic_published_at_ns, Ordering::Release); Ok(()) } @@ -340,12 +339,13 @@ pub mod linux { /// Returns `Err` wrapping the current `errno` with `msg` as context if `addr` equals /// `MAP_FAILED`, `Ok(addr)` otherwise. - fn check_mapping_addr(addr: *mut c_void, msg: &'static str) -> io::Result<*mut c_void> { + fn check_mapping_addr(addr: *mut c_void, msg: &'static str) -> io::Result> { if addr == libc::MAP_FAILED { let e = io::Error::last_os_error(); Err(io::Error::new(e.kind(), format!("{msg}: {e}"))) } else { - Ok(addr) + // SAFETY: mmap returns a non-null pointer on success. + Ok(unsafe { NonNull::new_unchecked(addr) }) } } @@ -377,7 +377,7 @@ pub mod linux { } // The returned size is guaranteed to be larger or equal to the size of `MappingHeader`. - fn mapping_size() -> usize { + const fn mapping_size() -> usize { size_of::() } @@ -420,6 +420,9 @@ pub mod linux { return false; }; + // The spec says: + // 1. **Locate mapping**: Parse `/proc//maps` and search for entries with name + // **starting with** `[anon_shmem:OTEL_CTX]`, `[anon:OTEL_CTX]` or `/memfd:OTEL_CTX`. name.starts_with("/memfd:OTEL_CTX") || name.starts_with("[anon_shmem:OTEL_CTX]") || name.starts_with("[anon:OTEL_CTX]") @@ -441,82 +444,140 @@ pub mod linux { } Err(io::Error::new( - io::ErrorKind::NotFound, - "couldn't find the mapping of the OTel process context", + ErrorKind::NotFound, + "couldn't find the mapping of the process context", )) } /// Reads and decodes the current process's OTel process context. pub fn read() -> io::Result { let mapping_addr = find_otel_mapping()?; - let header: *mut MappingHeader = ptr::with_exposed_provenance_mut(mapping_addr); + let header_ptr: *const MappingHeader = ptr::with_exposed_provenance(mapping_addr); // SAFETY: we're reading from our own process memory at an address we found in // /proc/self/maps. The mapping must be readable if it is listed as the OTel context. - let published_at = unsafe { (*header).monotonic_published_at_ns.load(Ordering::Relaxed) }; + let header = unsafe { + if header_ptr.is_null() { + return Err(io::Error::new( + ErrorKind::InvalidData, + "null process context header", + )); + } + &*header_ptr + }; + + let published_at = header.monotonic_published_at_ns.load(Ordering::Acquire); if published_at == 0 { return Err(io::Error::new( - io::ErrorKind::WouldBlock, + ErrorKind::WouldBlock, "process context is currently being updated", )); } - fence(Ordering::SeqCst); - - let (signature, version, payload_size, payload_ptr) = - // SAFETY: a non-zero published timestamp means the header is initialized. - if let Some(header) = unsafe { header.as_ref() } { - ( - header.signature, - header.version, - header.payload_size, - header.payload_ptr, - ) - } else { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "null process context header", - )); - }; + + // `signature` and `version` are immutable after the mapping becomes discoverable, + // so there is no change of races that would be UB. The seqlock-controlled fields + // must be loaded atomically because they can change during an update. + // The payload pointed to payload_ptr is also immutable, but it's irrelevant for the + // memory model because it's read with process_vm_readv(). + let signature = header.signature; + let version = header.version; if signature != *SIGNATURE { return Err(io::Error::new( - io::ErrorKind::InvalidData, + ErrorKind::InvalidData, "invalid signature in process context mapping", )); } if version != PROCESS_CTX_VERSION { return Err(io::Error::new( - io::ErrorKind::InvalidData, + ErrorKind::InvalidData, format!("unsupported process context version {version}"), )); } + + let payload_size = header.payload_size.load(Ordering::Relaxed); + let payload_ptr = header.payload_ptr.load(Ordering::Relaxed).cast_const(); + if payload_ptr.is_null() { return Err(io::Error::new( - io::ErrorKind::InvalidData, + ErrorKind::InvalidData, "process context payload pointer is null", )); } - // SAFETY: the publisher stores a pointer to `payload_size` initialized bytes and keeps that - // allocation alive until the next update. The timestamp check below detects concurrent - // updates and discards the read. - let payload = unsafe { std::slice::from_raw_parts(payload_ptr, payload_size as usize) }; - let context = ProcessContext::decode(payload) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let payload_bytes = read_process_memory( + unsafe { libc::getpid() }, + payload_ptr, + payload_size as usize, + )?; - fence(Ordering::SeqCst); - let published_at_after = - unsafe { (*header).monotonic_published_at_ns.load(Ordering::Relaxed) }; + // pairs with the first release fence on update() to ensure that, if we read data updated + // after the initial published time, we at least see the published time being set to 0 in + // the next load of the published time (or we could see a later time rather than 0) + fence(Ordering::Acquire); + + let published_at_after = header.monotonic_published_at_ns.load(Ordering::Acquire); if published_at != published_at_after { return Err(io::Error::new( - io::ErrorKind::WouldBlock, + ErrorKind::WouldBlock, "process context changed while being read", )); } + let context = ProcessContext::decode(payload_bytes.as_slice()) + .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?; + Ok(context) } + /// Reads `len` bytes from `addr` in the address space of `pid` via `process_vm_readv(2)`. + /// + /// Returns [`ErrorKind::WouldBlock`] when the remote memory is no longer mapped or only + /// partially readable. The kernel reports the former as [`libc::EFAULT`] from + /// `pin_user_pages_remote()` and the latter as a short read (see `process_vm_rw_core()` in + /// `mm/process_vm_access.c`). + fn read_process_memory(pid: libc::pid_t, addr: *const u8, len: usize) -> io::Result> { + if len == 0 { + return Ok(Vec::new()); + } + + let mut buf = vec![0u8; len]; + let local_iov = libc::iovec { + iov_base: buf.as_mut_ptr().cast(), + iov_len: len, + }; + let remote_iov = libc::iovec { + iov_base: addr.cast_mut().cast(), + iov_len: len, + }; + + // SAFETY: `buf` and `addr` each span `len` bytes for the duration of the syscall. + let nbytes = unsafe { libc::process_vm_readv(pid, &local_iov, 1, &remote_iov, 1, 0) }; + + if nbytes < 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EFAULT) { + return Err(io::Error::new( + ErrorKind::WouldBlock, + "process context payload was unmapped during read", + )); + } + return Err(io::Error::new( + err.kind(), + format!("failed to read process context payload: {err}"), + )); + } + + if nbytes as usize != len { + return Err(io::Error::new( + ErrorKind::WouldBlock, + "incomplete read of process context payload", + )); + } + + Ok(buf) + } + fn string_array(value: &AnyValue) -> Option> { let any_value::Value::ArrayValue(array) = value.value.as_ref()? else { return None; @@ -630,7 +691,15 @@ pub mod linux { use std::io; use std::sync::atomic::Ordering; - use super::MappingHeader; + use super::{any_value, AnyValue, KeyValue, MappingHeader, ProcessContext}; + + struct MappingHeaderSnapshot { + signature: [u8; 8], + version: u32, + payload_size: u32, + monotonic_published_at_ns: u64, + payload_ptr: *const u8, + } /// Read the process context from the current process. /// @@ -640,11 +709,39 @@ pub mod linux { /// functions it relies on, are specialized for tests (for example, it doesn't check for /// concurrent writers after reading the header, because we know they can't be). Do not /// extract or use as it is as a generic Rust OTel process context reader. - fn read_process_context() -> io::Result { + fn read_process_context() -> io::Result { let mapping_addr = super::find_otel_mapping()?; let header_ptr: *const MappingHeader = std::ptr::with_exposed_provenance(mapping_addr); // SAFETY: the mapping was published by this test before being read. - Ok(unsafe { std::ptr::read(header_ptr) }) + let header = unsafe { &*header_ptr }; + Ok(MappingHeaderSnapshot { + signature: header.signature, + version: header.version, + payload_size: header.payload_size.load(Ordering::Relaxed), + monotonic_published_at_ns: header.monotonic_published_at_ns.load(Ordering::Relaxed), + payload_ptr: header.payload_ptr.load(Ordering::Relaxed).cast_const(), + }) + } + + #[test] + #[cfg_attr(miri, ignore)] + fn publish_then_read_process_context() { + let context = ProcessContext { + resource: None, + extra_attributes: vec![KeyValue { + key: "service.name".to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue("checkout".to_string())), + }), + key_ref: 0, + }], + }; + + super::publish(&context).expect("couldn't publish the process context"); + let read_context = super::read().expect("couldn't read back the process context"); + super::unpublish().expect("couldn't unpublish the context"); + + assert!(read_context == context, "read back a different context"); } #[test] @@ -673,12 +770,12 @@ pub mod linux { "wrong payload size" ); assert!( - header.monotonic_published_at_ns.load(Ordering::Relaxed) > 0, + header.monotonic_published_at_ns > 0, "monotonic_published_at_ns is zero" ); assert!(read_payload == payload_v1.as_bytes(), "payload mismatch"); - let published_at_ns_v1 = header.monotonic_published_at_ns.load(Ordering::Relaxed); + let published_at_ns_v1 = header.monotonic_published_at_ns; // Ensure the clock advances so the updated timestamp is strictly greater std::thread::sleep(std::time::Duration::from_nanos(10)); @@ -702,7 +799,7 @@ pub mod linux { "wrong payload size" ); assert!( - header.monotonic_published_at_ns.load(Ordering::Relaxed) > published_at_ns_v1, + header.monotonic_published_at_ns > published_at_ns_v1, "published_at_ns should be strictly greater after update" ); assert!(read_payload == payload_v2.as_bytes(), "payload mismatch"); From f9f9cb3f9055c47c26ba9ae4e5d46ad15179170a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Thu, 2 Jul 2026 13:30:36 +0100 Subject: [PATCH 06/14] ProcessContextSelfReader to avoid rediscovering the mapping every read --- libdd-library-config/src/otel_process_ctx.rs | 371 +++++++++++-------- 1 file changed, 220 insertions(+), 151 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index ab0155a018..c4aba13d2a 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -3,6 +3,40 @@ //! Implementation of the publisher part of the [OTEL process //! context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719) +//! specification. +//! +//! Implements a seqlock-style algorithm, which generally goes like this: +//! +//! atomic seq{0}; +//! atomic data1, data2; +//! T reader() { +//! int r1, r2; +//! unsigned seq0, seq1; +//! do { +//! seq0 = seq.load(m_o_acquire); +//! r1 = data1.load(m_o_relaxed); +//! r2 = data2.load(m_o_relaxed); +//! atomic_thread_fence(m_o_acquire); +//! seq1 = seq.load(m_o_relaxed); +//! } while (seq0 & 1 || seq0 != seq1); +//! ... +//! } +//! +//! void writer(...) { +//! unsigned seq0 = seq.load(m_o_relaxed); +//! while (seq0 & 1 || +//! !seq.compare_exchange_weak(seq0, seq0 + 1, m_o_acquire)) {} +//! atomic_thread_fence(m_o_release); +//! data1.store(..., m_o_relaxed); +//! data2.store(..., m_o_relaxed); +//! seq.store(seq0 + 2, m_o_release); +//! } +//! +//! Although we instead use 0 to signal the writer is progress and a timestamp +//! instead of even numbers. +//! We also forbid concurrent writers, and leave the reader retries to the +//! discretion of the caller. +//! We ignore the corner case where time returns 0. #[cfg(target_os = "linux")] #[cfg(target_has_atomic = "64")] @@ -10,7 +44,7 @@ pub mod linux { use std::{ ffi::{c_void, CStr}, fs::File, - io::{self, BufRead, BufReader, ErrorKind}, + io::{self, BufRead, BufReader}, mem::{size_of, ManuallyDrop}, os::fd::{AsRawFd, FromRawFd, OwnedFd}, ptr::{self, NonNull}, @@ -36,18 +70,10 @@ pub mod linux { /// The header structure written at the start of the mapping. This must match the C /// layout of the specification. /// - /// # Atomic accesses - /// - /// The publishing protocol requires some form of synchronization. Using fences or any non-OS - /// based synchronization requires the use of atomics to have any effect (see [Mandatory - /// atomic](https://doc.rust-lang.org/std/sync/atomic/fn.fence.html#mandatory-atomic)) - /// - /// We use `monotonic_published_at_ns` for synchronization with the reader. `AtomicU64` has the - /// same in-memory representation as `u64` and is 8-bytes aligned. `payload_size` and - /// `payload_ptr` are atomic too because they are modified while readers may be sampling the - /// header. The timestamp field lands at offset 16 in the struct (after 8 bytes of signature + - /// 4 bytes version + 4 bytes payload_size), which satisfies that alignment on any - /// page-aligned base address. + /// The seqlock algorithm is inherently racy, so we need all accesses to be atomic + /// (even if relaxed); otherwise we hit UB. The only exception is the reading the + /// payload through process_vm_readv(), which is a syscall and so falls outside of + /// the scope of the memory model. #[repr(C)] struct MappingHeader { signature: [u8; 8], @@ -405,177 +431,219 @@ pub mod linux { }) } - /// Parses the start address from a /proc/self/maps line. - fn parse_mapping_start(line: &str) -> Option { - usize::from_str_radix(line.split('-').next()?, 16).ok() + /// Reader for the current process's OTel process context mapping. + /// + /// Locates the OTEL_CTX mapping at construction. Call [`read`](Self::read) repeatedly to fetch + /// updated context data without re-parsing `/proc/self/maps`, as long as the process has not + /// forked. After a `fork()`, reads fail and a new reader must be constructed. + pub struct ProcessContextSelfReader { + pid: libc::pid_t, + header_ptr: NonNull, } - /// Checks if a mapping line refers to the OTEL_CTX mapping. - fn is_named_otel_mapping(line: &str) -> bool { - let trimmed = line.trim_end(); + // SAFETY: ProcessContextSelfReader doesn't rely on thread local state and + // only references static memory -- owns nothing. + unsafe impl Send for ProcessContextSelfReader {} + // SAFETY: ProcessContextSelfReader doesn't modify anything + unsafe impl Sync for ProcessContextSelfReader {} + + impl ProcessContextSelfReader { + /// Locates the OTEL_CTX mapping in `/proc/self/maps`. + pub fn new() -> io::Result { + let mapping_addr = Self::find_otel_mapping()?; + // SAFETY: getpid() is always safe to call. + let pid = unsafe { libc::getpid() }; + Ok(Self { + pid, + header_ptr: Self::header_ptr_from_addr(mapping_addr)?, + }) + } - // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') - // but `split_whitespace()` takes care of that. - let Some(name) = trimmed.split_whitespace().nth(5) else { - return false; - }; + /// Reads and decodes the current process's OTel process context. + pub fn read(&self) -> io::Result { + // SAFETY: getpid() is always safe to call. + let current_pid = unsafe { libc::getpid() }; + if current_pid != self.pid { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "process context reader is stale after fork; construct a new reader", + )); + } - // The spec says: - // 1. **Locate mapping**: Parse `/proc//maps` and search for entries with name - // **starting with** `[anon_shmem:OTEL_CTX]`, `[anon:OTEL_CTX]` or `/memfd:OTEL_CTX`. - name.starts_with("/memfd:OTEL_CTX") - || name.starts_with("[anon_shmem:OTEL_CTX]") - || name.starts_with("[anon:OTEL_CTX]") - } + // SAFETY: `header_ptr` is non-null and points to our own process memory at an address + // we found in /proc/self/maps for `self.pid`. The mapping must be readable if it is + // listed as the OTel context. + let header = unsafe { self.header_ptr.as_ref() }; - /// Find the OTEL_CTX mapping in /proc/self/maps. - fn find_otel_mapping() -> io::Result { - let file = File::open("/proc/self/maps")?; - let reader = BufReader::new(file); + let published_at = header.monotonic_published_at_ns.load(Ordering::Acquire); + if published_at == 0 { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context is currently being updated", + )); + } - for line in reader.lines() { - let line = line?; + // `signature` and `version` are immutable after the mapping becomes discoverable, + // so there is no change of races that would be UB. The seqlock-controlled fields + // must be loaded atomically because they can change during an update. + // The payload pointed to payload_ptr is also immutable, but it's irrelevant for the + // memory model because it's read with process_vm_readv(). + let signature = header.signature; + let version = header.version; - if is_named_otel_mapping(&line) { - if let Some(addr) = parse_mapping_start(&line) { - return Ok(addr); - } + if signature != *SIGNATURE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid signature in process context mapping", + )); + } + if version != PROCESS_CTX_VERSION { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unsupported process context version {version}"), + )); } - } - Err(io::Error::new( - ErrorKind::NotFound, - "couldn't find the mapping of the process context", - )) - } + let payload_size = header.payload_size.load(Ordering::Relaxed); + let payload_ptr = header.payload_ptr.load(Ordering::Relaxed).cast_const(); - /// Reads and decodes the current process's OTel process context. - pub fn read() -> io::Result { - let mapping_addr = find_otel_mapping()?; - let header_ptr: *const MappingHeader = ptr::with_exposed_provenance(mapping_addr); + if payload_ptr.is_null() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "process context payload pointer is null", + )); + } + + let payload_bytes = + Self::read_process_memory(self.pid, payload_ptr, payload_size as usize)?; - // SAFETY: we're reading from our own process memory at an address we found in - // /proc/self/maps. The mapping must be readable if it is listed as the OTel context. - let header = unsafe { - if header_ptr.is_null() { + // pairs with the first release fence on update() to ensure that, if we read data updated + // after the initial published time, we at least see the published time being set to 0 in + // the next load of the published time (or we could see a later time rather than 0) + fence(Ordering::Acquire); + + let published_at_after = header.monotonic_published_at_ns.load(Ordering::Relaxed); + if published_at != published_at_after { return Err(io::Error::new( - ErrorKind::InvalidData, - "null process context header", + io::ErrorKind::WouldBlock, + "process context changed while being read", )); } - &*header_ptr - }; - let published_at = header.monotonic_published_at_ns.load(Ordering::Acquire); - if published_at == 0 { - return Err(io::Error::new( - ErrorKind::WouldBlock, - "process context is currently being updated", - )); - } + let context = ProcessContext::decode(payload_bytes.as_slice()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; - // `signature` and `version` are immutable after the mapping becomes discoverable, - // so there is no change of races that would be UB. The seqlock-controlled fields - // must be loaded atomically because they can change during an update. - // The payload pointed to payload_ptr is also immutable, but it's irrelevant for the - // memory model because it's read with process_vm_readv(). - let signature = header.signature; - let version = header.version; - - if signature != *SIGNATURE { - return Err(io::Error::new( - ErrorKind::InvalidData, - "invalid signature in process context mapping", - )); + Ok(context) } - if version != PROCESS_CTX_VERSION { - return Err(io::Error::new( - ErrorKind::InvalidData, - format!("unsupported process context version {version}"), - )); + + fn header_ptr_from_addr(mapping_addr: usize) -> io::Result> { + NonNull::new(ptr::with_exposed_provenance::(mapping_addr).cast_mut()) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "null process context header") + }) } - let payload_size = header.payload_size.load(Ordering::Relaxed); - let payload_ptr = header.payload_ptr.load(Ordering::Relaxed).cast_const(); + /// Find the OTEL_CTX mapping in /proc/self/maps. + fn find_otel_mapping() -> io::Result { + let file = File::open("/proc/self/maps")?; + let reader = BufReader::new(file); + + for line in reader.lines() { + let line = line?; + + if Self::is_named_otel_mapping(&line) { + if let Some(addr) = Self::parse_mapping_start(&line) { + return Ok(addr); + } + } + } - if payload_ptr.is_null() { - return Err(io::Error::new( - ErrorKind::InvalidData, - "process context payload pointer is null", - )); + Err(io::Error::new( + io::ErrorKind::NotFound, + "couldn't find the mapping of the process context", + )) } - let payload_bytes = read_process_memory( - unsafe { libc::getpid() }, - payload_ptr, - payload_size as usize, - )?; - - // pairs with the first release fence on update() to ensure that, if we read data updated - // after the initial published time, we at least see the published time being set to 0 in - // the next load of the published time (or we could see a later time rather than 0) - fence(Ordering::Acquire); - - let published_at_after = header.monotonic_published_at_ns.load(Ordering::Acquire); - if published_at != published_at_after { - return Err(io::Error::new( - ErrorKind::WouldBlock, - "process context changed while being read", - )); + /// Parses the start address from a /proc/self/maps line. + fn parse_mapping_start(line: &str) -> Option { + usize::from_str_radix(line.split('-').next()?, 16).ok() } - let context = ProcessContext::decode(payload_bytes.as_slice()) - .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?; + /// Checks if a mapping line refers to the OTEL_CTX mapping. + fn is_named_otel_mapping(line: &str) -> bool { + let trimmed = line.trim_end(); - Ok(context) - } + // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') + // but `split_whitespace()` takes care of that. + let Some(name) = trimmed.split_whitespace().nth(5) else { + return false; + }; - /// Reads `len` bytes from `addr` in the address space of `pid` via `process_vm_readv(2)`. - /// - /// Returns [`ErrorKind::WouldBlock`] when the remote memory is no longer mapped or only - /// partially readable. The kernel reports the former as [`libc::EFAULT`] from - /// `pin_user_pages_remote()` and the latter as a short read (see `process_vm_rw_core()` in - /// `mm/process_vm_access.c`). - fn read_process_memory(pid: libc::pid_t, addr: *const u8, len: usize) -> io::Result> { - if len == 0 { - return Ok(Vec::new()); + // The spec says: + // 1. **Locate mapping**: Parse `/proc//maps` and search for entries with name + // **starting with** `[anon_shmem:OTEL_CTX]`, `[anon:OTEL_CTX]` or `/memfd:OTEL_CTX`. + name.starts_with("/memfd:OTEL_CTX") + || name.starts_with("[anon_shmem:OTEL_CTX]") + || name.starts_with("[anon:OTEL_CTX]") } - let mut buf = vec![0u8; len]; - let local_iov = libc::iovec { - iov_base: buf.as_mut_ptr().cast(), - iov_len: len, - }; - let remote_iov = libc::iovec { - iov_base: addr.cast_mut().cast(), - iov_len: len, - }; + /// Reads `len` bytes from `addr` in the address space of `pid` via `process_vm_readv(2)`. + /// + /// Returns [`ErrorKind::WouldBlock`] when the remote memory is no longer mapped or only + /// partially readable. The kernel reports the former as [`libc::EFAULT`] from + /// `pin_user_pages_remote()` and the latter as a short read (see `process_vm_rw_core()` in + /// `mm/process_vm_access.c`). + fn read_process_memory( + pid: libc::pid_t, + addr: *const u8, + len: usize, + ) -> io::Result> { + if len == 0 { + return Ok(Vec::new()); + } - // SAFETY: `buf` and `addr` each span `len` bytes for the duration of the syscall. - let nbytes = unsafe { libc::process_vm_readv(pid, &local_iov, 1, &remote_iov, 1, 0) }; + let mut buf = vec![0u8; len]; + let local_iov = libc::iovec { + iov_base: buf.as_mut_ptr().cast(), + iov_len: len, + }; + let remote_iov = libc::iovec { + iov_base: addr.cast_mut().cast(), + iov_len: len, + }; - if nbytes < 0 { - let err = io::Error::last_os_error(); - if err.raw_os_error() == Some(libc::EFAULT) { + // SAFETY: `buf` and `addr` each span `len` bytes for the duration of the syscall. + let nbytes = unsafe { libc::process_vm_readv(pid, &local_iov, 1, &remote_iov, 1, 0) }; + + if nbytes < 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EFAULT) { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context payload was unmapped during read", + )); + } return Err(io::Error::new( - ErrorKind::WouldBlock, - "process context payload was unmapped during read", + err.kind(), + format!("failed to read process context payload: {err}"), )); } - return Err(io::Error::new( - err.kind(), - format!("failed to read process context payload: {err}"), - )); - } - if nbytes as usize != len { - return Err(io::Error::new( - ErrorKind::WouldBlock, - "incomplete read of process context payload", - )); + if nbytes as usize != len { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "incomplete read of process context payload", + )); + } + + Ok(buf) } + } - Ok(buf) + /// Reads and decodes the current process's OTel process context. + /// To read multiple times, construct a new reader with [`ProcessContextSelfReader::new()`]. + pub fn read() -> io::Result { + ProcessContextSelfReader::new()?.read() } fn string_array(value: &AnyValue) -> Option> { @@ -710,7 +778,7 @@ pub mod linux { /// concurrent writers after reading the header, because we know they can't be). Do not /// extract or use as it is as a generic Rust OTel process context reader. fn read_process_context() -> io::Result { - let mapping_addr = super::find_otel_mapping()?; + let mapping_addr = super::ProcessContextSelfReader::find_otel_mapping()?; let header_ptr: *const MappingHeader = std::ptr::with_exposed_provenance(mapping_addr); // SAFETY: the mapping was published by this test before being read. let header = unsafe { &*header_ptr }; @@ -816,13 +884,14 @@ pub mod linux { .expect("couldn't publish the process context"); // The mapping must be discoverable right after publishing - super::find_otel_mapping().expect("couldn't find the otel mapping after publishing"); + super::ProcessContextSelfReader::find_otel_mapping() + .expect("couldn't find the otel mapping after publishing"); super::unpublish().expect("couldn't unpublish the context"); // After unpublishing the name must no longer appear in /proc/self/maps assert!( - super::find_otel_mapping().is_err(), + super::ProcessContextSelfReader::find_otel_mapping().is_err(), "otel mapping should not be visible after unpublish" ); } From 7b89ee6547e19bc4cbd8aa2fd53119473fc9e5ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Thu, 2 Jul 2026 13:38:37 +0100 Subject: [PATCH 07/14] make unpublish() unsafe --- libdd-library-config/src/otel_process_ctx.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index c4aba13d2a..eff12e4bb3 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -743,7 +743,12 @@ pub mod linux { /// this is no-op. /// /// A call to [publish] following an [unpublish] will create a new mapping. - pub fn unpublish() -> io::Result<()> { + /// + /// # Safety + /// + /// This function may only be called if it can be guaranteed that there are no in-process + /// readers, or at least that they will not be used after the call. + pub unsafe fn unpublish() -> io::Result<()> { let mut guard = lock_context_handle()?; if let Some(ProcessContextHandle { mapping, .. }) = guard.take() { @@ -807,7 +812,9 @@ pub mod linux { super::publish(&context).expect("couldn't publish the process context"); let read_context = super::read().expect("couldn't read back the process context"); - super::unpublish().expect("couldn't unpublish the context"); + unsafe { + super::unpublish().expect("couldn't unpublish the context"); + } assert!(read_context == context, "read back a different context"); } @@ -872,7 +879,9 @@ pub mod linux { ); assert!(read_payload == payload_v2.as_bytes(), "payload mismatch"); - super::unpublish().expect("couldn't unpublish the context"); + unsafe { + super::unpublish().expect("couldn't unpublish the context"); + } } #[test] @@ -887,7 +896,9 @@ pub mod linux { super::ProcessContextSelfReader::find_otel_mapping() .expect("couldn't find the otel mapping after publishing"); - super::unpublish().expect("couldn't unpublish the context"); + unsafe { + super::unpublish().expect("couldn't unpublish the context"); + } // After unpublishing the name must no longer appear in /proc/self/maps assert!( From 0426fc4f869c64af5483c12fb2d7a41e6754aee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Thu, 2 Jul 2026 13:55:23 +0100 Subject: [PATCH 08/14] fix race in the memfd path In the memfd path, the header becomes immediately discoverable. Therefore, we cannot do an non-atomic write to monotonic_published_at_ns with value 0, as this could race with a reader that did an acquire read on monotonic_published_at_ns. This write is also unnecessary because the mapping comes zero-initialized, and we'd be writing a zero (so it would also _likely_ be harmless anyway, even if violating the memory model). Note that there can be no race in signature/version. These are only read if the published_at is nonzero, and the release-acquire relationship guarantees that their final non-zero values are visible if published_at is nonzero. --- libdd-library-config/src/otel_process_ctx.rs | 36 +++++++++----------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index eff12e4bb3..5a3ee8a1d5 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -261,6 +261,11 @@ pub mod linux { impl ProcessContextHandle { /// Initial publication of the process context. Creates an appropriate memory mapping. fn publish(payload: Vec) -> io::Result { + let payload_size = payload + .len() + .try_into() + .map_err(|_| io::Error::other("payload size overflowed"))?; + let mut mapping = MemMapping::new()?; let size = mapping_size(); check_syscall_retval( @@ -276,27 +281,18 @@ pub mod linux { let header = mapping.start_addr.as_ptr() as *mut MappingHeader; + // SAFETY: header points to a zero-filled, page-aligned mapping of at least + // mapping_size() bytes; field projections are in-bounds and aligned. unsafe { - // SAFETY: header points to a freshly mmaped region valid for at least - // `mapping_size()` bytes, which we ensure is >= size_of::(). The - // base address is page-aligned, so all fields including `monotonic_published_at_ns` - // (at offset 16) satisfy their alignment requirements. - ptr::write( - header, - MappingHeader { - signature: *SIGNATURE, - version: PROCESS_CTX_VERSION, - payload_size: AtomicU32::new( - payload - .len() - .try_into() - .map_err(|_| io::Error::other("payload size overflowed"))?, - ), - // will be set atomically at last - monotonic_published_at_ns: AtomicU64::new(0), - payload_ptr: AtomicPtr::new(payload.as_ptr().cast_mut()), - }, - ); + ptr::addr_of_mut!((*header).signature).write(*SIGNATURE); + ptr::addr_of_mut!((*header).version).write(PROCESS_CTX_VERSION); + (*header) + .payload_size + .store(payload_size, Ordering::Relaxed); + (*header) + .payload_ptr + .store(payload.as_ptr().cast_mut(), Ordering::Relaxed); + (*header) .monotonic_published_at_ns .store(published_at_ns, Ordering::Release); From a432165fa25fe371cf0a472612203f7efc0531eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Thu, 2 Jul 2026 13:59:53 +0100 Subject: [PATCH 09/14] reorder drop of payload and unpublishing of header --- libdd-library-config/src/otel_process_ctx.rs | 21 ++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index 5a3ee8a1d5..c99c8abfc9 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -747,8 +747,25 @@ pub mod linux { pub unsafe fn unpublish() -> io::Result<()> { let mut guard = lock_context_handle()?; - if let Some(ProcessContextHandle { mapping, .. }) = guard.take() { - mapping.free()?; + if let Some(ProcessContextHandle { + mapping, payload, .. + }) = guard.take() + { + // Mark the context as being-updated and order that store before the payload free. + // + // SAFETY: the mapping is still live and valid; the timestamp field is atomic + // and aligned. + let header = mapping.start_addr.as_ptr() as *mut MappingHeader; + unsafe { + (*header) + .monotonic_published_at_ns + .store(0, Ordering::Relaxed); + } + fence(Ordering::Release); + + mapping.free()?; // payload will still drop if it fails + // but we'll be stuck with a zero timestamp + drop(payload); } Ok(()) From 052fb8430506529dfc9d220a46a07c229cb41042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Thu, 2 Jul 2026 14:22:21 +0100 Subject: [PATCH 10/14] cargo +nightly fmt --- libdd-library-config/src/otel_process_ctx.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index c99c8abfc9..da25be757a 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -513,9 +513,10 @@ pub mod linux { let payload_bytes = Self::read_process_memory(self.pid, payload_ptr, payload_size as usize)?; - // pairs with the first release fence on update() to ensure that, if we read data updated - // after the initial published time, we at least see the published time being set to 0 in - // the next load of the published time (or we could see a later time rather than 0) + // pairs with the first release fence on update() to ensure that, if we read data + // updated after the initial published time, we at least see the published + // time being set to 0 in the next load of the published time (or we could + // see a later time rather than 0) fence(Ordering::Acquire); let published_at_after = header.monotonic_published_at_ns.load(Ordering::Relaxed); From b92917bee845211cc5e1bb2955c4d64e95812a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Fri, 3 Jul 2026 16:10:27 +0100 Subject: [PATCH 11/14] address otel process context review --- libdd-library-config/src/otel_process_ctx.rs | 184 ++++++++++++------- 1 file changed, 120 insertions(+), 64 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index da25be757a..40f8d999c9 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -1,42 +1,28 @@ // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! Implementation of the publisher part of the [OTEL process -//! context](https://github.com/open-telemetry/opentelemetry-specification/pull/4719) -//! specification. +//! Implementation of the publisher and same-process reader parts of the [OTEL process +//! context specification](https://github.com/open-telemetry/opentelemetry-specification/pull/4719). //! -//! Implements a seqlock-style algorithm, which generally goes like this: +//! The update/read protocol is seqlock-style: the publisher marks the mapping as unavailable, +//! writes the payload metadata, publishes a non-zero version, and readers accept a copy only if +//! the version they observed before copying still matches afterward. The general algorithm and +//! the C++ memory-model constraints are described in Boehm's +//! [Can Seqlocks Get Along With Programming Language Memory Models?](https://web.archive.org/web/20211106170334/https://www.hpl.hp.com/techreports/2012/HPL-2012-68.pdf). +//! Linux has its own [seqlock/seqcount implementation](https://github.com/torvalds/linux/blob/master/include/linux/seqlock.h), +//! but its barriers are specified by the Linux kernel memory model, not by the C++/Rust models. //! -//! atomic seq{0}; -//! atomic data1, data2; -//! T reader() { -//! int r1, r2; -//! unsigned seq0, seq1; -//! do { -//! seq0 = seq.load(m_o_acquire); -//! r1 = data1.load(m_o_relaxed); -//! r2 = data2.load(m_o_relaxed); -//! atomic_thread_fence(m_o_acquire); -//! seq1 = seq.load(m_o_relaxed); -//! } while (seq0 & 1 || seq0 != seq1); -//! ... -//! } +//! This implementation differs from the usual odd/even counter form in two ways: `0` is the +//! in-progress sentinel, and each non-zero `monotonic_published_at_ns` value is the +//! reader-visible version. Updates force that timestamp to advance so readers can detect torn +//! reads even when the clock returns the same value twice. Concurrent writers are rejected, and +//! retry policy is left to the reader's caller. //! -//! void writer(...) { -//! unsigned seq0 = seq.load(m_o_relaxed); -//! while (seq0 & 1 || -//! !seq.compare_exchange_weak(seq0, seq0 + 1, m_o_acquire)) {} -//! atomic_thread_fence(m_o_release); -//! data1.store(..., m_o_relaxed); -//! data2.store(..., m_o_relaxed); -//! seq.store(seq0 + 2, m_o_release); -//! } -//! -//! Although we instead use 0 to signal the writer is progress and a timestamp -//! instead of even numbers. -//! We also forbid concurrent writers, and leave the reader retries to the -//! discretion of the caller. -//! We ignore the corner case where time returns 0. +//! Process context sharing also crosses Rust's memory model boundary. In-process header fields +//! that can change during publication are atomics, while payload bytes are copied with +//! `process_vm_readv`; that syscall turns accesses to reclaimed payload memory that has been +//! unmapped into a syscall error or short read instead of a segfault, but its ordering relative to +//! the publisher has to be reasoned about at the OS/architecture level rather than only in Rust. #[cfg(target_os = "linux")] #[cfg(target_has_atomic = "64")] @@ -55,6 +41,9 @@ pub mod linux { time::Duration, }; + #[cfg(debug_assertions)] + use std::sync::atomic::AtomicUsize; + use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ any_value, AnyValue, KeyValue, ProcessContext, }; @@ -66,14 +55,16 @@ pub mod linux { pub const SIGNATURE: &[u8; 8] = b"OTEL_CTX"; /// The discoverable name of the memory mapping. pub const MAPPING_NAME: &CStr = c"OTEL_CTX"; + /// Sentinel timestamp indicating that the context is unpublished or being updated. + const UNPUBLISHED_OR_UPDATING: u64 = 0; /// The header structure written at the start of the mapping. This must match the C /// layout of the specification. /// - /// The seqlock algorithm is inherently racy, so we need all accesses to be atomic - /// (even if relaxed); otherwise we hit UB. The only exception is the reading the - /// payload through process_vm_readv(), which is a syscall and so falls outside of - /// the scope of the memory model. + /// The seqlock algorithm is inherently racy, so fields that can change during an + /// update must be atomic (even if accessed relaxed); otherwise we hit UB. `signature` + /// and `version` are intentionally plain fields; see the read-side synchronization comment in + /// [`ProcessContextSelfReader::read`] for why their accesses are race-free. #[repr(C)] struct MappingHeader { signature: [u8; 8], @@ -97,6 +88,7 @@ pub mod linux { assert!(core::mem::align_of::() == 8); assert!(core::mem::align_of::() == core::mem::align_of::()); assert!(core::mem::align_of::>() == core::mem::align_of::<*mut u8>()); + assert!(size_of::<*mut u8>() == size_of::()); }; /// The shared memory mapped area to publish the context to. The memory region is owned by a @@ -124,13 +116,19 @@ pub mod linux { /// single thread should handle context updates, even if it's not strictly required. static PROCESS_CONTEXT_HANDLER: Mutex> = Mutex::new(None); + #[cfg(debug_assertions)] + static LIVE_SELF_READERS: AtomicUsize = AtomicUsize::new(0); + impl MemMapping { /// Creates a suitable memory mapping for the context protocol to be published. /// /// `memfd` is the preferred method, but this function fallbacks to an anonymous mapping if /// `memfd` failed for any reason. /// - /// The memory is guaranteed to initialized to zeroes. + /// The memory is guaranteed to be initialized to zeroes. This matters because a + /// memfd-backed mapping is discoverable before `set_name()` runs, so early readers may + /// race with header initialization. They must observe [`UNPUBLISHED_OR_UPDATING`] (0) and + /// stop until the final timestamp store publishes the initialized header. fn new() -> io::Result { let size = mapping_size(); @@ -261,7 +259,7 @@ pub mod linux { impl ProcessContextHandle { /// Initial publication of the process context. Creates an appropriate memory mapping. fn publish(payload: Vec) -> io::Result { - let payload_size = payload + let payload_size: u32 = payload .len() .try_into() .map_err(|_| io::Error::other("payload size overflowed"))?; @@ -318,18 +316,24 @@ pub mod linux { let monotonic_published_at_ns = since_boottime_ns() .ok_or_else(|| io::Error::other("could not get the current timestamp"))?; - let payload_size = payload.len().try_into().map_err(|_| { + let payload_size: u32 = payload.len().try_into().map_err(|_| { io::Error::other("couldn't update process context: new payload too large") })?; - // A process shouldn't try to concurrently update its own context + // A process shouldn't try to concurrently update its own context. + // + // `UNPUBLISHED_OR_UPDATING` is an out-of-band sentinel, not a value that + // `CLOCK_BOOTTIME` is expected to produce after publication. Published non-zero + // timestamp values must advance monotonically; the field may temporarily hold the + // sentinel while an update is in progress. // // Note: be careful of early return while `monotonic_published_at` is still zero, as // this would effectively "lock" any future publishing. Move throwing code above this // swap, or properly restore the previous value if the former can't be done. // SAFETY: the mapping is live and valid; the timestamp field is atomic and aligned. let published_at_atomic = unsafe { &(*header).monotonic_published_at_ns }; - let previous_published_at_ns = published_at_atomic.swap(0, Ordering::Acquire); - if previous_published_at_ns == 0 { + let previous_published_at_ns = + published_at_atomic.swap(UNPUBLISHED_OR_UPDATING, Ordering::Acquire); + if previous_published_at_ns == UNPUBLISHED_OR_UPDATING { return Err(io::Error::other( "concurrent update of the process context is not supported", )); @@ -340,6 +344,10 @@ pub mod linux { let monotonic_published_at_ns = monotonic_published_at_ns.max(previous_published_at_ns.saturating_add(1)); + // Pair this with the reader's acquire fence before its second timestamp load. If a + // reader starts from the previous non-zero timestamp but copies data after this update + // begins, it must not accept that copy as the previous version: its final timestamp + // check should see `UNPUBLISHED_OR_UPDATING` or the later published timestamp. fence(Ordering::Release); self.payload = payload; @@ -399,7 +407,7 @@ pub mod linux { } // The returned size is guaranteed to be larger or equal to the size of `MappingHeader`. - const fn mapping_size() -> usize { + fn mapping_size() -> usize { size_of::() } @@ -449,10 +457,13 @@ pub mod linux { let mapping_addr = Self::find_otel_mapping()?; // SAFETY: getpid() is always safe to call. let pid = unsafe { libc::getpid() }; - Ok(Self { + let reader = Self { pid, header_ptr: Self::header_ptr_from_addr(mapping_addr)?, - }) + }; + #[cfg(debug_assertions)] + LIVE_SELF_READERS.fetch_add(1, Ordering::Relaxed); + Ok(reader) } /// Reads and decodes the current process's OTel process context. @@ -472,18 +483,19 @@ pub mod linux { let header = unsafe { self.header_ptr.as_ref() }; let published_at = header.monotonic_published_at_ns.load(Ordering::Acquire); - if published_at == 0 { + if published_at == UNPUBLISHED_OR_UPDATING { return Err(io::Error::new( io::ErrorKind::WouldBlock, "process context is currently being updated", )); } - // `signature` and `version` are immutable after the mapping becomes discoverable, - // so there is no change of races that would be UB. The seqlock-controlled fields - // must be loaded atomically because they can change during an update. - // The payload pointed to payload_ptr is also immutable, but it's irrelevant for the - // memory model because it's read with process_vm_readv(). + // `signature` and `version` are initialized before the release store that publishes + // a non-zero timestamp. If the acquire load above observed that timestamp, those + // writes are visible; if it observed `UNPUBLISHED_OR_UPDATING`, we returned before + // reading them. Updates never mutate these fields, so their accesses are race-free. + // The seqlock-controlled fields must be loaded atomically because they can change + // during an update. let signature = header.signature; let version = header.version; @@ -576,20 +588,20 @@ pub mod linux { return false; }; - // The spec says: - // 1. **Locate mapping**: Parse `/proc//maps` and search for entries with name - // **starting with** `[anon_shmem:OTEL_CTX]`, `[anon:OTEL_CTX]` or `/memfd:OTEL_CTX`. - name.starts_with("/memfd:OTEL_CTX") - || name.starts_with("[anon_shmem:OTEL_CTX]") - || name.starts_with("[anon:OTEL_CTX]") + // The OTel process context spec says to search for entries whose names start with + // these prefixes. In `/proc//maps`, however, the optional ` (deleted)` suffix is + // emitted as a separate token, so the mapping-name token itself should match exactly. + name == "/memfd:OTEL_CTX" + || name == "[anon_shmem:OTEL_CTX]" + || name == "[anon:OTEL_CTX]" } /// Reads `len` bytes from `addr` in the address space of `pid` via `process_vm_readv(2)`. /// - /// Returns [`ErrorKind::WouldBlock`] when the remote memory is no longer mapped or only - /// partially readable. The kernel reports the former as [`libc::EFAULT`] from - /// `pin_user_pages_remote()` and the latter as a short read (see `process_vm_rw_core()` in - /// `mm/process_vm_access.c`). + /// Returns [`ErrorKind::WouldBlock`] for retryable races where the remote memory is no + /// longer mapped or only partially readable. The kernel reports the former as + /// [`libc::EFAULT`] from `pin_user_pages_remote()` and the latter as a short read (see + /// `process_vm_rw_core()` in `mm/process_vm_access.c`). fn read_process_memory( pid: libc::pid_t, addr: *const u8, @@ -637,6 +649,13 @@ pub mod linux { } } + #[cfg(debug_assertions)] + impl Drop for ProcessContextSelfReader { + fn drop(&mut self) { + LIVE_SELF_READERS.fetch_sub(1, Ordering::Relaxed); + } + } + /// Reads and decodes the current process's OTel process context. /// To read multiple times, construct a new reader with [`ProcessContextSelfReader::new()`]. pub fn read() -> io::Result { @@ -658,6 +677,8 @@ pub mod linux { .collect() } + // The process context only carries a small resource/extra attribute set, so a linear scan + // keeps this helper allocation-free and simpler than building a temporary index. fn find_attr<'a>(attrs: &'a [KeyValue], key: &str) -> Option<&'a AnyValue> { attrs .iter() @@ -748,11 +769,23 @@ pub mod linux { pub unsafe fn unpublish() -> io::Result<()> { let mut guard = lock_context_handle()?; + #[cfg(debug_assertions)] + debug_assert_eq!( + LIVE_SELF_READERS.load(Ordering::Relaxed), + 0, + "unpublish called while ProcessContextSelfReader instances are live" + ); + if let Some(ProcessContextHandle { mapping, payload, .. }) = guard.take() { - // Mark the context as being-updated and order that store before the payload free. + // Mark the context as unavailable before freeing the mapping/payload. The fence forces + // the writing CPU not to reorder the unavailable timestamp store and the deallocation + // stores. Formal correctness, which this does not aim for, only applies to in-process + // readers, and those are forbidden to read after `unpublish()` because they could also + // hit an unmapped header, which is why this function is unsafe. The ordering should + // help out-of-process readers. // // SAFETY: the mapping is still live and valid; the timestamp field is atomic // and aligned. @@ -760,7 +793,7 @@ pub mod linux { unsafe { (*header) .monotonic_published_at_ns - .store(0, Ordering::Relaxed); + .store(UNPUBLISHED_OR_UPDATING, Ordering::Relaxed); } fence(Ordering::Release); @@ -810,6 +843,29 @@ pub mod linux { }) } + #[test] + fn is_named_otel_mapping_matches_exact_mapping_name() { + assert!(super::ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX" + )); + assert!(super::ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX (deleted)" + )); + assert!(super::ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon_shmem:OTEL_CTX]" + )); + assert!(super::ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX]" + )); + + assert!(!super::ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX_BACKUP" + )); + assert!(!super::ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX_old]" + )); + } + #[test] #[cfg_attr(miri, ignore)] fn publish_then_read_process_context() { From bf28bde1300c40e910f81926acb2d155280a7b69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Fri, 3 Jul 2026 16:19:37 +0100 Subject: [PATCH 12/14] refactor(library-config): split otel process context reader --- libdd-library-config/src/otel_process_ctx.rs | 258 +---------------- .../src/otel_process_ctx/linux/self_reader.rs | 268 ++++++++++++++++++ 2 files changed, 273 insertions(+), 253 deletions(-) create mode 100644 libdd-library-config/src/otel_process_ctx/linux/self_reader.rs diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index 40f8d999c9..eb3e3919c6 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -29,8 +29,7 @@ pub mod linux { use std::{ ffi::{c_void, CStr}, - fs::File, - io::{self, BufRead, BufReader}, + io, mem::{size_of, ManuallyDrop}, os::fd::{AsRawFd, FromRawFd, OwnedFd}, ptr::{self, NonNull}, @@ -41,14 +40,14 @@ pub mod linux { time::Duration, }; - #[cfg(debug_assertions)] - use std::sync::atomic::AtomicUsize; - use libdd_trace_protobuf::opentelemetry::proto::common::v1::{ any_value, AnyValue, KeyValue, ProcessContext, }; use prost::Message; + mod self_reader; + pub use self_reader::ProcessContextSelfReader; + /// Current version of the process context format pub const PROCESS_CTX_VERSION: u32 = 2; /// Signature bytes for identifying process context mappings @@ -116,9 +115,6 @@ pub mod linux { /// single thread should handle context updates, even if it's not strictly required. static PROCESS_CONTEXT_HANDLER: Mutex> = Mutex::new(None); - #[cfg(debug_assertions)] - static LIVE_SELF_READERS: AtomicUsize = AtomicUsize::new(0); - impl MemMapping { /// Creates a suitable memory mapping for the context protocol to be published. /// @@ -435,227 +431,6 @@ pub mod linux { }) } - /// Reader for the current process's OTel process context mapping. - /// - /// Locates the OTEL_CTX mapping at construction. Call [`read`](Self::read) repeatedly to fetch - /// updated context data without re-parsing `/proc/self/maps`, as long as the process has not - /// forked. After a `fork()`, reads fail and a new reader must be constructed. - pub struct ProcessContextSelfReader { - pid: libc::pid_t, - header_ptr: NonNull, - } - - // SAFETY: ProcessContextSelfReader doesn't rely on thread local state and - // only references static memory -- owns nothing. - unsafe impl Send for ProcessContextSelfReader {} - // SAFETY: ProcessContextSelfReader doesn't modify anything - unsafe impl Sync for ProcessContextSelfReader {} - - impl ProcessContextSelfReader { - /// Locates the OTEL_CTX mapping in `/proc/self/maps`. - pub fn new() -> io::Result { - let mapping_addr = Self::find_otel_mapping()?; - // SAFETY: getpid() is always safe to call. - let pid = unsafe { libc::getpid() }; - let reader = Self { - pid, - header_ptr: Self::header_ptr_from_addr(mapping_addr)?, - }; - #[cfg(debug_assertions)] - LIVE_SELF_READERS.fetch_add(1, Ordering::Relaxed); - Ok(reader) - } - - /// Reads and decodes the current process's OTel process context. - pub fn read(&self) -> io::Result { - // SAFETY: getpid() is always safe to call. - let current_pid = unsafe { libc::getpid() }; - if current_pid != self.pid { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "process context reader is stale after fork; construct a new reader", - )); - } - - // SAFETY: `header_ptr` is non-null and points to our own process memory at an address - // we found in /proc/self/maps for `self.pid`. The mapping must be readable if it is - // listed as the OTel context. - let header = unsafe { self.header_ptr.as_ref() }; - - let published_at = header.monotonic_published_at_ns.load(Ordering::Acquire); - if published_at == UNPUBLISHED_OR_UPDATING { - return Err(io::Error::new( - io::ErrorKind::WouldBlock, - "process context is currently being updated", - )); - } - - // `signature` and `version` are initialized before the release store that publishes - // a non-zero timestamp. If the acquire load above observed that timestamp, those - // writes are visible; if it observed `UNPUBLISHED_OR_UPDATING`, we returned before - // reading them. Updates never mutate these fields, so their accesses are race-free. - // The seqlock-controlled fields must be loaded atomically because they can change - // during an update. - let signature = header.signature; - let version = header.version; - - if signature != *SIGNATURE { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "invalid signature in process context mapping", - )); - } - if version != PROCESS_CTX_VERSION { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("unsupported process context version {version}"), - )); - } - - let payload_size = header.payload_size.load(Ordering::Relaxed); - let payload_ptr = header.payload_ptr.load(Ordering::Relaxed).cast_const(); - - if payload_ptr.is_null() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "process context payload pointer is null", - )); - } - - let payload_bytes = - Self::read_process_memory(self.pid, payload_ptr, payload_size as usize)?; - - // pairs with the first release fence on update() to ensure that, if we read data - // updated after the initial published time, we at least see the published - // time being set to 0 in the next load of the published time (or we could - // see a later time rather than 0) - fence(Ordering::Acquire); - - let published_at_after = header.monotonic_published_at_ns.load(Ordering::Relaxed); - if published_at != published_at_after { - return Err(io::Error::new( - io::ErrorKind::WouldBlock, - "process context changed while being read", - )); - } - - let context = ProcessContext::decode(payload_bytes.as_slice()) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; - - Ok(context) - } - - fn header_ptr_from_addr(mapping_addr: usize) -> io::Result> { - NonNull::new(ptr::with_exposed_provenance::(mapping_addr).cast_mut()) - .ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidData, "null process context header") - }) - } - - /// Find the OTEL_CTX mapping in /proc/self/maps. - fn find_otel_mapping() -> io::Result { - let file = File::open("/proc/self/maps")?; - let reader = BufReader::new(file); - - for line in reader.lines() { - let line = line?; - - if Self::is_named_otel_mapping(&line) { - if let Some(addr) = Self::parse_mapping_start(&line) { - return Ok(addr); - } - } - } - - Err(io::Error::new( - io::ErrorKind::NotFound, - "couldn't find the mapping of the process context", - )) - } - - /// Parses the start address from a /proc/self/maps line. - fn parse_mapping_start(line: &str) -> Option { - usize::from_str_radix(line.split('-').next()?, 16).ok() - } - - /// Checks if a mapping line refers to the OTEL_CTX mapping. - fn is_named_otel_mapping(line: &str) -> bool { - let trimmed = line.trim_end(); - - // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') - // but `split_whitespace()` takes care of that. - let Some(name) = trimmed.split_whitespace().nth(5) else { - return false; - }; - - // The OTel process context spec says to search for entries whose names start with - // these prefixes. In `/proc//maps`, however, the optional ` (deleted)` suffix is - // emitted as a separate token, so the mapping-name token itself should match exactly. - name == "/memfd:OTEL_CTX" - || name == "[anon_shmem:OTEL_CTX]" - || name == "[anon:OTEL_CTX]" - } - - /// Reads `len` bytes from `addr` in the address space of `pid` via `process_vm_readv(2)`. - /// - /// Returns [`ErrorKind::WouldBlock`] for retryable races where the remote memory is no - /// longer mapped or only partially readable. The kernel reports the former as - /// [`libc::EFAULT`] from `pin_user_pages_remote()` and the latter as a short read (see - /// `process_vm_rw_core()` in `mm/process_vm_access.c`). - fn read_process_memory( - pid: libc::pid_t, - addr: *const u8, - len: usize, - ) -> io::Result> { - if len == 0 { - return Ok(Vec::new()); - } - - let mut buf = vec![0u8; len]; - let local_iov = libc::iovec { - iov_base: buf.as_mut_ptr().cast(), - iov_len: len, - }; - let remote_iov = libc::iovec { - iov_base: addr.cast_mut().cast(), - iov_len: len, - }; - - // SAFETY: `buf` and `addr` each span `len` bytes for the duration of the syscall. - let nbytes = unsafe { libc::process_vm_readv(pid, &local_iov, 1, &remote_iov, 1, 0) }; - - if nbytes < 0 { - let err = io::Error::last_os_error(); - if err.raw_os_error() == Some(libc::EFAULT) { - return Err(io::Error::new( - io::ErrorKind::WouldBlock, - "process context payload was unmapped during read", - )); - } - return Err(io::Error::new( - err.kind(), - format!("failed to read process context payload: {err}"), - )); - } - - if nbytes as usize != len { - return Err(io::Error::new( - io::ErrorKind::WouldBlock, - "incomplete read of process context payload", - )); - } - - Ok(buf) - } - } - - #[cfg(debug_assertions)] - impl Drop for ProcessContextSelfReader { - fn drop(&mut self) { - LIVE_SELF_READERS.fetch_sub(1, Ordering::Relaxed); - } - } - /// Reads and decodes the current process's OTel process context. /// To read multiple times, construct a new reader with [`ProcessContextSelfReader::new()`]. pub fn read() -> io::Result { @@ -771,7 +546,7 @@ pub mod linux { #[cfg(debug_assertions)] debug_assert_eq!( - LIVE_SELF_READERS.load(Ordering::Relaxed), + self_reader::live_reader_count(), 0, "unpublish called while ProcessContextSelfReader instances are live" ); @@ -843,29 +618,6 @@ pub mod linux { }) } - #[test] - fn is_named_otel_mapping_matches_exact_mapping_name() { - assert!(super::ProcessContextSelfReader::is_named_otel_mapping( - "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX" - )); - assert!(super::ProcessContextSelfReader::is_named_otel_mapping( - "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX (deleted)" - )); - assert!(super::ProcessContextSelfReader::is_named_otel_mapping( - "7f000000-7f001000 rw-p 00000000 00:00 0 [anon_shmem:OTEL_CTX]" - )); - assert!(super::ProcessContextSelfReader::is_named_otel_mapping( - "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX]" - )); - - assert!(!super::ProcessContextSelfReader::is_named_otel_mapping( - "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX_BACKUP" - )); - assert!(!super::ProcessContextSelfReader::is_named_otel_mapping( - "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX_old]" - )); - } - #[test] #[cfg_attr(miri, ignore)] fn publish_then_read_process_context() { diff --git a/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs b/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs new file mode 100644 index 0000000000..ac050a455a --- /dev/null +++ b/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs @@ -0,0 +1,268 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + fs::File, + io::{self, BufRead, BufReader}, + ptr::{self, NonNull}, + sync::atomic::{fence, Ordering}, +}; + +#[cfg(debug_assertions)] +use std::sync::atomic::AtomicUsize; + +use libdd_trace_protobuf::opentelemetry::proto::common::v1::ProcessContext; +use prost::Message; + +use super::{MappingHeader, PROCESS_CTX_VERSION, SIGNATURE, UNPUBLISHED_OR_UPDATING}; + +#[cfg(debug_assertions)] +static LIVE_SELF_READERS: AtomicUsize = AtomicUsize::new(0); + +/// Reader for the current process's OTel process context mapping. +/// +/// Locates the OTEL_CTX mapping at construction. Call [`read`](Self::read) repeatedly to fetch +/// updated context data without re-parsing `/proc/self/maps`, as long as the process has not +/// forked. After a `fork()`, reads fail and a new reader must be constructed. +pub struct ProcessContextSelfReader { + pid: libc::pid_t, + header_ptr: NonNull, +} + +// SAFETY: ProcessContextSelfReader doesn't rely on thread local state and +// only references static memory -- owns nothing. +unsafe impl Send for ProcessContextSelfReader {} +// SAFETY: ProcessContextSelfReader doesn't modify anything +unsafe impl Sync for ProcessContextSelfReader {} + +impl ProcessContextSelfReader { + /// Locates the OTEL_CTX mapping in `/proc/self/maps`. + pub fn new() -> io::Result { + let mapping_addr = Self::find_otel_mapping()?; + // SAFETY: getpid() is always safe to call. + let pid = unsafe { libc::getpid() }; + let reader = Self { + pid, + header_ptr: Self::header_ptr_from_addr(mapping_addr)?, + }; + #[cfg(debug_assertions)] + LIVE_SELF_READERS.fetch_add(1, Ordering::Relaxed); + Ok(reader) + } + + /// Reads and decodes the current process's OTel process context. + pub fn read(&self) -> io::Result { + // SAFETY: getpid() is always safe to call. + let current_pid = unsafe { libc::getpid() }; + if current_pid != self.pid { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "process context reader is stale after fork; construct a new reader", + )); + } + + // SAFETY: `header_ptr` is non-null and points to our own process memory at an address + // we found in /proc/self/maps for `self.pid`. The mapping must be readable if it is + // listed as the OTel context. + let header = unsafe { self.header_ptr.as_ref() }; + + let published_at = header.monotonic_published_at_ns.load(Ordering::Acquire); + if published_at == UNPUBLISHED_OR_UPDATING { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context is currently being updated", + )); + } + + // `signature` and `version` are initialized before the release store that publishes + // a non-zero timestamp. If the acquire load above observed that timestamp, those + // writes are visible; if it observed `UNPUBLISHED_OR_UPDATING`, we returned before + // reading them. Updates never mutate these fields, so their accesses are race-free. + // The seqlock-controlled fields must be loaded atomically because they can change + // during an update. + let signature = header.signature; + let version = header.version; + + if signature != *SIGNATURE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid signature in process context mapping", + )); + } + if version != PROCESS_CTX_VERSION { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unsupported process context version {version}"), + )); + } + + let payload_size = header.payload_size.load(Ordering::Relaxed); + let payload_ptr = header.payload_ptr.load(Ordering::Relaxed).cast_const(); + + if payload_ptr.is_null() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "process context payload pointer is null", + )); + } + + let payload_bytes = + Self::read_process_memory(self.pid, payload_ptr, payload_size as usize)?; + + // pairs with the first release fence on update() to ensure that, if we read data + // updated after the initial published time, we at least see the published + // time being set to 0 in the next load of the published time (or we could + // see a later time rather than 0) + fence(Ordering::Acquire); + + let published_at_after = header.monotonic_published_at_ns.load(Ordering::Relaxed); + if published_at != published_at_after { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context changed while being read", + )); + } + + let context = ProcessContext::decode(payload_bytes.as_slice()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + + Ok(context) + } + + fn header_ptr_from_addr(mapping_addr: usize) -> io::Result> { + NonNull::new(ptr::with_exposed_provenance::(mapping_addr).cast_mut()) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "null process context header") + }) + } + + /// Find the OTEL_CTX mapping in /proc/self/maps. + pub(super) fn find_otel_mapping() -> io::Result { + let file = File::open("/proc/self/maps")?; + let reader = BufReader::new(file); + + for line in reader.lines() { + let line = line?; + + if Self::is_named_otel_mapping(&line) { + if let Some(addr) = Self::parse_mapping_start(&line) { + return Ok(addr); + } + } + } + + Err(io::Error::new( + io::ErrorKind::NotFound, + "couldn't find the mapping of the process context", + )) + } + + /// Parses the start address from a /proc/self/maps line. + fn parse_mapping_start(line: &str) -> Option { + usize::from_str_radix(line.split('-').next()?, 16).ok() + } + + /// Checks if a mapping line refers to the OTEL_CTX mapping. + fn is_named_otel_mapping(line: &str) -> bool { + let trimmed = line.trim_end(); + + // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') + // but `split_whitespace()` takes care of that. + let Some(name) = trimmed.split_whitespace().nth(5) else { + return false; + }; + + // The OTel process context spec says to search for entries whose names start with + // these prefixes. In `/proc//maps`, however, the optional ` (deleted)` suffix is + // emitted as a separate token, so the mapping-name token itself should match exactly. + name == "/memfd:OTEL_CTX" || name == "[anon_shmem:OTEL_CTX]" || name == "[anon:OTEL_CTX]" + } + + /// Reads `len` bytes from `addr` in the address space of `pid` via `process_vm_readv(2)`. + /// + /// Returns [`ErrorKind::WouldBlock`] for retryable races where the remote memory is no + /// longer mapped or only partially readable. The kernel reports the former as + /// [`libc::EFAULT`] from `pin_user_pages_remote()` and the latter as a short read (see + /// `process_vm_rw_core()` in `mm/process_vm_access.c`). + fn read_process_memory(pid: libc::pid_t, addr: *const u8, len: usize) -> io::Result> { + if len == 0 { + return Ok(Vec::new()); + } + + let mut buf = vec![0u8; len]; + let local_iov = libc::iovec { + iov_base: buf.as_mut_ptr().cast(), + iov_len: len, + }; + let remote_iov = libc::iovec { + iov_base: addr.cast_mut().cast(), + iov_len: len, + }; + + // SAFETY: `buf` and `addr` each span `len` bytes for the duration of the syscall. + let nbytes = unsafe { libc::process_vm_readv(pid, &local_iov, 1, &remote_iov, 1, 0) }; + + if nbytes < 0 { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EFAULT) { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "process context payload was unmapped during read", + )); + } + return Err(io::Error::new( + err.kind(), + format!("failed to read process context payload: {err}"), + )); + } + + if nbytes as usize != len { + return Err(io::Error::new( + io::ErrorKind::WouldBlock, + "incomplete read of process context payload", + )); + } + + Ok(buf) + } +} + +#[cfg(debug_assertions)] +impl Drop for ProcessContextSelfReader { + fn drop(&mut self) { + LIVE_SELF_READERS.fetch_sub(1, Ordering::Relaxed); + } +} + +#[cfg(debug_assertions)] +pub(super) fn live_reader_count() -> usize { + LIVE_SELF_READERS.load(Ordering::Relaxed) +} + +#[cfg(test)] +mod tests { + use super::ProcessContextSelfReader; + + #[test] + fn is_named_otel_mapping_matches_exact_mapping_name() { + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX" + )); + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX (deleted)" + )); + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon_shmem:OTEL_CTX]" + )); + assert!(ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX]" + )); + + assert!(!ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 /memfd:OTEL_CTX_BACKUP" + )); + assert!(!ProcessContextSelfReader::is_named_otel_mapping( + "7f000000-7f001000 rw-p 00000000 00:00 0 [anon:OTEL_CTX_old]" + )); + } +} From a199105877aac73365c878c5507dd6281d5de3a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Fri, 3 Jul 2026 16:31:13 +0100 Subject: [PATCH 13/14] docs(library-config): clarify zero-filled mapping source --- libdd-library-config/src/otel_process_ctx.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx.rs b/libdd-library-config/src/otel_process_ctx.rs index eb3e3919c6..8c2235092d 100644 --- a/libdd-library-config/src/otel_process_ctx.rs +++ b/libdd-library-config/src/otel_process_ctx.rs @@ -121,10 +121,12 @@ pub mod linux { /// `memfd` is the preferred method, but this function fallbacks to an anonymous mapping if /// `memfd` failed for any reason. /// - /// The memory is guaranteed to be initialized to zeroes. This matters because a - /// memfd-backed mapping is discoverable before `set_name()` runs, so early readers may - /// race with header initialization. They must observe [`UNPUBLISHED_OR_UPDATING`] (0) and - /// stop until the final timestamp store publishes the initialized header. + /// Both allocation paths produce zero-filled memory: `MAP_ANONYMOUS` mappings are + /// initialized to zero, and the memfd path maps a newly-created file extended by + /// `ftruncate()`, whose extended bytes read as `\0`. This matters because a memfd-backed + /// mapping is discoverable before `set_name()` runs, so early readers may race with header + /// initialization. They must observe [`UNPUBLISHED_OR_UPDATING`] (0) and stop until the + /// final timestamp store publishes the initialized header. fn new() -> io::Result { let size = mapping_size(); From 2aa0099ed00827350027c5c1838cfe9cdc9d0ecf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20Andr=C3=A9=20dos=20Santos=20Lopes?= Date: Fri, 3 Jul 2026 17:24:12 +0100 Subject: [PATCH 14/14] docs(library-config): cite proc maps pathname format --- .../src/otel_process_ctx/linux/self_reader.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs b/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs index ac050a455a..edf4a23e61 100644 --- a/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs +++ b/libdd-library-config/src/otel_process_ctx/linux/self_reader.rs @@ -166,8 +166,10 @@ impl ProcessContextSelfReader { fn is_named_otel_mapping(line: &str) -> bool { let trimmed = line.trim_end(); - // The name of the mapping is the 6th column. The separator changes (both ' ' and '\t') - // but `split_whitespace()` takes care of that. + // The mapping name is the `pathname` column documented for `/proc//maps`: + // https://github.com/torvalds/linux/blob/9147566d801602c9e7fc7f85e989735735bf38ba/Documentation/filesystems/proc.rst?plain=1#L384-L386 + // For the OTEL_CTX names we care about, it is the 6th whitespace-delimited field; + // `split_whitespace()` ignores the column padding. let Some(name) = trimmed.split_whitespace().nth(5) else { return false; };