From 22aa5ce016a9604c75c7cfee300672e49c9ee583 Mon Sep 17 00:00:00 2001 From: Mar Witek Date: Wed, 24 Jun 2026 01:40:50 +0200 Subject: [PATCH 1/7] Make SharedSpan construction explicit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the blanket `impl From for SharedSpan` — which always registered the span in the harness's `active_roots` — with an explicit `shared_span()` constructor, and migrate the internal call sites. This makes tracked construction a deliberate choice and sets up a later untracked variant for user spans. --- foundations/src/telemetry/tracing/internal.rs | 25 ++++++++----------- foundations/src/telemetry/tracing/mod.rs | 4 +-- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/foundations/src/telemetry/tracing/internal.rs b/foundations/src/telemetry/tracing/internal.rs index 91eb7a6..22fc76c 100644 --- a/foundations/src/telemetry/tracing/internal.rs +++ b/foundations/src/telemetry/tracing/internal.rs @@ -60,14 +60,13 @@ pub(crate) struct SharedSpan { pub(crate) is_sampled: bool, } -impl From for SharedSpan { - fn from(inner: Span) -> Self { - let is_sampled = inner.is_sampled(); +/// Wraps a span and registers it with the internal harness's `active_roots` for live tracking. +pub(crate) fn shared_span(span: Span) -> SharedSpan { + let is_sampled = span.is_sampled(); - Self { - inner: SharedSpanHandle::new(inner), - is_sampled, - } + SharedSpan { + inner: SharedSpanHandle::new(span), + is_sampled, } } @@ -87,11 +86,10 @@ pub fn write_current_span(write_fn: impl FnOnce(&mut Span)) { } pub(crate) fn create_span(name: impl Into>) -> SharedSpan { - match current_span() { + shared_span(match current_span() { Some(parent) => parent.inner.with_read(|s| s.child(name, |o| o.start())), None => start_trace(name, Default::default()), - } - .into() + }) } pub(crate) fn current_span() -> Option { @@ -182,20 +180,19 @@ fn link_new_trace_with_current( pub(crate) fn fork_trace(fork_name: impl Into>) -> SharedSpan { match current_span() { Some(span) if span.is_sampled => span, - _ => return Span::inactive().into(), + _ => return shared_span(Span::inactive()), }; let fork_name = fork_name.into(); - start_trace( + shared_span(start_trace( fork_name, StartTraceOptions { // NOTE: If the current span is sampled, then forked trace is also forcibly sampled override_sampling_ratio: Some(1.0), ..Default::default() }, - ) - .into() + )) } fn create_fork_ref_span(fork_name: &str, current_span: &Span) -> Span { diff --git a/foundations/src/telemetry/tracing/mod.rs b/foundations/src/telemetry/tracing/mod.rs index ac12b91..70ed43c 100644 --- a/foundations/src/telemetry/tracing/mod.rs +++ b/foundations/src/telemetry/tracing/mod.rs @@ -19,7 +19,7 @@ mod rate_limit; mod output_otlp_grpc; use self::init::TracingHarness; -use self::internal::{SharedSpan, create_span, current_span, span_trace_id}; +use self::internal::{SharedSpan, create_span, current_span, shared_span, span_trace_id}; use super::TelemetryContext; use super::scope::Scope; use std::borrow::Cow; @@ -413,7 +413,7 @@ pub fn start_trace( root_span_name: impl Into>, options: StartTraceOptions, ) -> SpanScope { - SpanScope::new(internal::start_trace(root_span_name, options).into()) + SpanScope::new(shared_span(internal::start_trace(root_span_name, options))) } /// Returns the current span as a raw [rustracing] crate's `Span` that is used by Foundations internally. From 47cd747d290549c45706997aa17c34db15790539 Mon Sep 17 00:00:00 2001 From: Mar Witek Date: Wed, 24 Jun 2026 02:53:58 +0200 Subject: [PATCH 2/7] Add core user-tracing span API Adds the user-span pipeline core: an `Untracked` `user_shared_span` constructor, the `start_user_trace`/`user_span` entry points and `UserSpanScope` guard on a separate `USER_HARNESS` (with a `USER_NOOP_HARNESS` fallback), plus the `add_user_span_tags!`/`add_user_span_log_fields!`/`set_user_span_finish_callback!` macros. Includes the test harness (`user_traces()` second sink) so user spans are observed independently of the internal pipeline. `start_user_trace` is name-only here; routing and inbound W3C continuation are layered on in later changes. --- foundations/Cargo.toml | 3 + foundations/src/telemetry/mod.rs | 6 + .../src/telemetry/telemetry_context.rs | 25 +++ foundations/src/telemetry/testing.rs | 18 ++ foundations/src/telemetry/tracing/init.rs | 25 +++ foundations/src/telemetry/tracing/internal.rs | 52 +++++ foundations/src/telemetry/tracing/mod.rs | 205 ++++++++++++++++++ foundations/src/telemetry/tracing/testing.rs | 21 ++ 8 files changed, 355 insertions(+) diff --git a/foundations/Cargo.toml b/foundations/Cargo.toml index 5809b21..3733adf 100644 --- a/foundations/Cargo.toml +++ b/foundations/Cargo.toml @@ -148,6 +148,9 @@ tracing = [ "dep:crossbeam-utils", ] +# Enables distributed user tracing functionality. +user-tracing = ["tracing"] + # Enables metrics functionality. metrics = [ "dep:foundations-macros", diff --git a/foundations/src/telemetry/mod.rs b/foundations/src/telemetry/mod.rs index 9865982..218b8e0 100644 --- a/foundations/src/telemetry/mod.rs +++ b/foundations/src/telemetry/mod.rs @@ -139,6 +139,9 @@ feature_use!(cfg(feature = "tracing"), { }); }); +#[cfg(all(feature = "user-tracing", feature = "testing"))] +use self::tracing::testing::UserTestTracerScope; + #[cfg(feature = "logging")] use self::log::internal::LogScope; @@ -265,6 +268,9 @@ pub struct TelemetryScope { // the harness. #[cfg(all(feature = "tracing", feature = "testing"))] _test_tracer_scope: Option, + + #[cfg(all(feature = "user-tracing", feature = "testing"))] + _user_test_tracer_scope: Option, } /// Telemetry configuration that is passed to [`init`]. diff --git a/foundations/src/telemetry/telemetry_context.rs b/foundations/src/telemetry/telemetry_context.rs index 749f5c7..5074d5c 100644 --- a/foundations/src/telemetry/telemetry_context.rs +++ b/foundations/src/telemetry/telemetry_context.rs @@ -21,6 +21,12 @@ feature_use!(cfg(feature = "tracing"), { }); }); +feature_use!(cfg(feature = "user-tracing"), { + feature_use!(cfg(feature = "testing"), { + use super::tracing::testing::{UserTestTracerScope, current_user_test_tracer}; + }); +}); + #[cfg(feature = "testing")] use super::testing::TestTelemetryContext; @@ -100,6 +106,9 @@ pub struct TelemetryContext { #[cfg(all(feature = "tracing", feature = "testing"))] pub(super) test_tracer: Option, + + #[cfg(all(feature = "user-tracing", feature = "testing"))] + pub(super) user_test_tracer: Option, } impl TelemetryContext { @@ -114,6 +123,9 @@ impl TelemetryContext { #[cfg(all(feature = "tracing", feature = "testing"))] test_tracer: current_test_tracer(), + + #[cfg(all(feature = "user-tracing", feature = "testing"))] + user_test_tracer: current_user_test_tracer(), } } @@ -174,6 +186,13 @@ impl TelemetryContext { #[cfg(all(feature = "tracing", feature = "testing"))] _test_tracer_scope: self.test_tracer.as_ref().cloned().map(TestTracerScope::new), + + #[cfg(all(feature = "user-tracing", feature = "testing"))] + _user_test_tracer_scope: self + .user_test_tracer + .as_ref() + .cloned() + .map(UserTestTracerScope::new), } } @@ -385,6 +404,9 @@ impl TelemetryContext { #[cfg(feature = "testing")] test_tracer: self.test_tracer.clone(), + + #[cfg(all(feature = "user-tracing", feature = "testing"))] + user_test_tracer: self.user_test_tracer.clone(), } } } @@ -460,6 +482,9 @@ impl TelemetryContext { #[cfg(all(feature = "tracing", feature = "testing"))] test_tracer: self.test_tracer.clone(), + + #[cfg(all(feature = "user-tracing", feature = "testing"))] + user_test_tracer: self.user_test_tracer.clone(), } } } diff --git a/foundations/src/telemetry/testing.rs b/foundations/src/telemetry/testing.rs index 16028c1..8eebd54 100644 --- a/foundations/src/telemetry/testing.rs +++ b/foundations/src/telemetry/testing.rs @@ -42,6 +42,9 @@ pub struct TestTelemetryContext { #[cfg(feature = "tracing")] traces_sink: parking_lot::Mutex, + #[cfg(feature = "user-tracing")] + user_traces_sink: parking_lot::Mutex, + #[cfg(feature = "logging")] log_records: TestLogRecords, } @@ -62,6 +65,9 @@ impl TestTelemetryContext { #[cfg(feature = "tracing")] let (tracer, traces_sink) = create_test_tracer(&Default::default()); + #[cfg(feature = "user-tracing")] + let (user_tracer, user_traces_sink) = create_test_tracer(&Default::default()); + TestTelemetryContext { inner: TelemetryContext { #[cfg(feature = "logging")] @@ -72,11 +78,17 @@ impl TestTelemetryContext { #[cfg(feature = "tracing")] test_tracer: Some(tracer), + + #[cfg(feature = "user-tracing")] + user_test_tracer: Some(user_tracer), }, #[cfg(feature = "tracing")] traces_sink: parking_lot::Mutex::new(traces_sink), + #[cfg(feature = "user-tracing")] + user_traces_sink: parking_lot::Mutex::new(user_traces_sink), + #[cfg(feature = "logging")] log_records, } @@ -138,6 +150,12 @@ impl TestTelemetryContext { pub fn traces(&self, options: TestTraceOptions) -> Vec { self.traces_sink.lock().traces(options) } + + /// Returns all the user-tracing traces produced in the test context. + #[cfg(feature = "user-tracing")] + pub fn user_traces(&self, options: TestTraceOptions) -> Vec { + self.user_traces_sink.lock().traces(options) + } } impl Deref for TestTelemetryContext { diff --git a/foundations/src/telemetry/tracing/init.rs b/foundations/src/telemetry/tracing/init.rs index f1c4c77..152e9a9 100644 --- a/foundations/src/telemetry/tracing/init.rs +++ b/foundations/src/telemetry/tracing/init.rs @@ -21,6 +21,9 @@ use std::borrow::Cow; // ensure initialization. Make sure nobody else invalidates our cache lines. static HARNESS: CachePadded> = CachePadded::new(OnceLock::new()); +#[cfg(feature = "user-tracing")] +static USER_HARNESS: CachePadded> = CachePadded::new(OnceLock::new()); + static NOOP_HARNESS: CachePadded> = CachePadded::new(LazyLock::new(|| { let (noop_tracer, _) = Tracer::new(NullSampler.boxed()); @@ -36,6 +39,22 @@ static NOOP_HARNESS: CachePadded> = } })); +#[cfg(feature = "user-tracing")] +static USER_NOOP_HARNESS: CachePadded> = + CachePadded::new(LazyLock::new(|| { + let (noop_tracer, _) = Tracer::new(NullSampler.boxed()); + + TracingHarness { + tracer: noop_tracer, + span_scope_stack: Default::default(), + + #[cfg(feature = "testing")] + test_tracer_scope_stack: Default::default(), + + active_roots: Default::default(), + } + })); + pub(crate) struct TracingHarness { tracer: Tracer, @@ -52,6 +71,12 @@ impl TracingHarness { HARNESS.get().unwrap_or_else(|| &**NOOP_HARNESS) } + /// User-tracing harness, or the user no-op harness when the user pipeline isn't initialized. + #[cfg(feature = "user-tracing")] + pub(crate) fn get_user() -> &'static Self { + USER_HARNESS.get().unwrap_or_else(|| &**USER_NOOP_HARNESS) + } + #[cfg(feature = "testing")] pub(crate) fn tracer(&'static self) -> Cow<'static, Tracer> { self.test_tracer_scope_stack diff --git a/foundations/src/telemetry/tracing/internal.rs b/foundations/src/telemetry/tracing/internal.rs index 22fc76c..db2b237 100644 --- a/foundations/src/telemetry/tracing/internal.rs +++ b/foundations/src/telemetry/tracing/internal.rs @@ -70,6 +70,21 @@ pub(crate) fn shared_span(span: Span) -> SharedSpan { } } +/// Wraps a user span as `Untracked`/`Inactive`, bypassing `active_roots` so user spans never +/// enter the internal harness's live registry. +#[cfg(feature = "user-tracing")] +pub(crate) fn user_shared_span(span: Span) -> SharedSpan { + let is_sampled = span.is_sampled(); + + let inner = if is_sampled { + SharedSpanHandle::Untracked(Arc::new(RwLock::new(span))) + } else { + SharedSpanHandle::Inactive + }; + + SharedSpan { inner, is_sampled } +} + pub fn write_current_span(write_fn: impl FnOnce(&mut Span)) { let span = match current_span() { Some(span) if span.is_sampled => span, @@ -135,6 +150,43 @@ pub(crate) fn start_trace( new_trace_root_span } +#[cfg(feature = "user-tracing")] +pub(crate) fn current_user_span() -> Option { + TracingHarness::get_user().span_scope_stack.current() +} + +/// Child of the current user span, or inactive when no user trace is active (never a root). +#[cfg(feature = "user-tracing")] +pub(crate) fn create_user_span(name: impl Into>) -> SharedSpan { + match current_user_span() { + Some(parent) => user_shared_span(parent.inner.with_read(|s| s.child(name, |o| o.start()))), + None => user_shared_span(Span::inactive()), + } +} + +#[cfg(feature = "user-tracing")] +pub fn write_current_user_span(write_fn: impl FnOnce(&mut Span)) { + let span = match current_user_span() { + Some(span) if span.is_sampled => span, + _ => return, + }; + + let mut span_guard = match &span.inner { + SharedSpanHandle::Tracked(handle) => handle.write(), + SharedSpanHandle::Untracked(rw_lock) => rw_lock.write(), + SharedSpanHandle::Inactive => unreachable!("inactive spans can't be sampled"), + }; + + write_fn(&mut span_guard); +} + +/// Starts a root user span on the user harness. +#[cfg(feature = "user-tracing")] +pub(crate) fn start_user_trace(name: impl Into>) -> Span { + let tracer = TracingHarness::get_user().tracer(); + tracer.span(name).start() +} + pub(super) fn reporter_error(err: impl Error) { #[cfg(feature = "logging")] crate::telemetry::log::error!("failed to report traces to the traces sink"; "error" => %err); diff --git a/foundations/src/telemetry/tracing/mod.rs b/foundations/src/telemetry/tracing/mod.rs index 70ed43c..b359df5 100644 --- a/foundations/src/telemetry/tracing/mod.rs +++ b/foundations/src/telemetry/tracing/mod.rs @@ -20,6 +20,8 @@ mod output_otlp_grpc; use self::init::TracingHarness; use self::internal::{SharedSpan, create_span, current_span, shared_span, span_trace_id}; +#[cfg(feature = "user-tracing")] +use self::internal::{create_user_span, user_shared_span}; use super::TelemetryContext; use super::scope::Scope; use std::borrow::Cow; @@ -218,6 +220,25 @@ impl SpanScope { } } +/// A handle for the scope in which a user-tracing span is active. +/// +/// Scope ends when the handle is dropped. +#[cfg(feature = "user-tracing")] +#[must_use] +pub struct UserSpanScope { + _inner: Scope, +} + +#[cfg(feature = "user-tracing")] +impl UserSpanScope { + #[inline] + pub(crate) fn new(span: SharedSpan) -> Self { + Self { + _inner: Scope::new(&TracingHarness::get_user().span_scope_stack, span), + } + } +} + /// Options for a new trace. #[derive(Default, Debug)] pub struct StartTraceOptions { @@ -416,6 +437,21 @@ pub fn start_trace( SpanScope::new(shared_span(internal::start_trace(root_span_name, options))) } +/// Starts a root user span (per-request activation). +/// +/// Without an active root, `user_span` / `add_user_span_tags!` are no-ops. +#[cfg(feature = "user-tracing")] +pub fn start_user_trace(name: impl Into>) -> UserSpanScope { + UserSpanScope::new(user_shared_span(internal::start_user_trace(name))) +} + +/// Creates a user span as a child of the current user span, or inactive when no user trace is +/// active. Never starts a root — roots come only from [`start_user_trace`]. +#[cfg(feature = "user-tracing")] +pub fn user_span(name: impl Into>) -> UserSpanScope { + UserSpanScope::new(create_user_span(name)) +} + /// Returns the current span as a raw [rustracing] crate's `Span` that is used by Foundations internally. /// /// Can be used to propagate the tracing context to libraries that don't use Foundations' @@ -728,6 +764,68 @@ macro_rules! __set_span_finish_callback { }}; } +/// Adds tags to the current user span. No-op when no user trace is active. +#[cfg(feature = "user-tracing")] +#[macro_export] +#[doc(hidden)] +macro_rules! __add_user_span_tags { + ( $( $name:expr => $val:expr ),+ ) => { + $crate::telemetry::tracing::internal::write_current_user_span(|span| { + span.set_tags(|| { + vec![ $($crate::reexports_for_macros::cf_rustracing::tag::Tag::new($name, $val)),+ ] + }); + }); + }; + + ( $tags:expr ) => { + $crate::telemetry::tracing::internal::write_current_user_span(|span| { + span.set_tags(|| { + $tags + .into_iter() + .map(|(name, val)| { + $crate::reexports_for_macros::cf_rustracing::tag::Tag::new(name, val) + }) + }); + }); + }; +} + +/// Adds log fields to the current user span. No-op when no user trace is active. +#[cfg(feature = "user-tracing")] +#[macro_export] +#[doc(hidden)] +macro_rules! __add_user_span_log_fields { + ( $( $field:expr => $val:expr ),+ ) => { + $crate::telemetry::tracing::internal::write_current_user_span(|span| { + span.log(|builder| { + $( + builder.field(($field, $val)); + )+ + }); + }); + }; +} + +/// Sets (`$cb`) or clears (`None`) the finish callback on the current user span. No-op when no +/// user trace is active. Routing is set at construction by `start_user_trace`, so this is a +/// general escape hatch — not used for routing. +#[cfg(feature = "user-tracing")] +#[macro_export] +#[doc(hidden)] +macro_rules! __set_user_span_finish_callback { + ( None ) => { + $crate::telemetry::tracing::internal::write_current_user_span(|span| { + span.take_finish_callback(); + }) + }; + ( $cb:expr ) => {{ + let cb = $cb; + $crate::telemetry::tracing::internal::write_current_user_span(move |span| { + span.set_finish_callback(cb); + }) + }}; +} + /// A convenience macro to construct [`TestTrace`] for test assertions. /// /// Note that for span timings the macro always generates default @@ -917,6 +1015,113 @@ pub use { __set_span_finish_time as set_span_finish_time, __set_span_start_time as set_span_start_time, }; +#[cfg(feature = "user-tracing")] +#[doc(inline)] +pub use { + __add_user_span_log_fields as add_user_span_log_fields, + __add_user_span_tags as add_user_span_tags, + __set_user_span_finish_callback as set_user_span_finish_callback, +}; + #[cfg(feature = "testing")] #[doc(inline)] pub use __test_trace as test_trace; + +#[cfg(all(test, feature = "user-tracing", feature = "testing"))] +mod user_tracing_tests { + use super::{ + add_user_span_log_fields, add_user_span_tags, set_user_span_finish_callback, + start_user_trace, test_trace, user_span, + }; + use crate::telemetry::TelemetryContext; + use crate::telemetry::tracing::{Span, TestTraceOptions}; + use cf_rustracing::tag::{Tag, TagValue}; + + #[test] + fn creation_and_nesting() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request"); + let _child = user_span("child"); + let _grandchild = user_span("grandchild"); + } + + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { + "request" => { + "child" => { + "grandchild" + } + } + }] + ); + // User spans must not leak into the internal pipeline. + assert!(ctx.traces(Default::default()).is_empty()); + } + + #[test] + fn tags_and_logs() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request"); + add_user_span_tags!("cache.status" => "HIT"); + add_user_span_log_fields!("event" => "lookup"); + } + + let opts = TestTraceOptions { + include_tags: true, + include_logs: true, + ..Default::default() + }; + let traces = ctx.user_traces(opts); + let root = &traces[0].0; + + assert!( + root.tags + .contains(&("cache.status".to_string(), TagValue::String("HIT".into()))) + ); + assert!( + root.logs + .contains(&("event".to_string(), "lookup".to_string())) + ); + } + + #[test] + fn no_op_without_activation() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + // No `start_user_trace`, so user tracing isn't active for this scope. + let _child = user_span("child"); + add_user_span_tags!("k" => "v"); + } + + assert!(ctx.user_traces(Default::default()).is_empty()); + } + + #[test] + fn finish_callback_runs() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request"); + set_user_span_finish_callback!(|span: &mut Span| { + span.set_tag(|| Tag::new("finished", true)); + }); + } + + let opts = TestTraceOptions { + include_tags: true, + ..Default::default() + }; + let traces = ctx.user_traces(opts); + assert!(traces[0].0.tags.iter().any(|(k, _)| k == "finished")); + } +} diff --git a/foundations/src/telemetry/tracing/testing.rs b/foundations/src/telemetry/tracing/testing.rs index 966bbe1..0234f37 100644 --- a/foundations/src/telemetry/tracing/testing.rs +++ b/foundations/src/telemetry/tracing/testing.rs @@ -160,6 +160,22 @@ impl TestTracerScope { } } +#[cfg(feature = "user-tracing")] +#[must_use] +pub(crate) struct UserTestTracerScope { + _inner: Scope, +} + +#[cfg(feature = "user-tracing")] +impl UserTestTracerScope { + #[inline] + pub(crate) fn new(tracer: Tracer) -> Self { + Self { + _inner: Scope::new(&TracingHarness::get_user().test_tracer_scope_stack, tracer), + } + } +} + pub(crate) struct TestTracesSink { span_rx: SharedSpanReceiver, raw_spans: HashMap>, @@ -254,6 +270,11 @@ pub(crate) fn current_test_tracer() -> Option { TracingHarness::get().test_tracer_scope_stack.current() } +#[cfg(feature = "user-tracing")] +pub(crate) fn current_user_test_tracer() -> Option { + TracingHarness::get_user().test_tracer_scope_stack.current() +} + pub(crate) fn create_test_tracer(settings: &TracingSettings) -> (Tracer, TestTracesSink) { let (tracer, span_rx) = create_tracer_and_span_rx(settings).expect("should create tracer with default settings"); From 8dad0e449d80562f74e0c7d0f2f35dab83bdd968 Mon Sep 17 00:00:00 2001 From: Mar Witek Date: Wed, 24 Jun 2026 03:00:47 +0200 Subject: [PATCH 3/7] Propagate user spans through TelemetryContext MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `user_span` slot to `TelemetryContext` (captured by `current()`, re-established by `scope()`, cloned across forks) plus `UserSpanScope::into_context()` and a parallel carry on `SpanScope::into_context()`. This lets a user span survive `.await`/`tokio::spawn` and ride along even when propagation goes through an internal span's context — no explicit threading. Verified by `propagates_across_await` and `user_span_carried_by_internal_context`. --- foundations/src/telemetry/mod.rs | 6 ++ .../src/telemetry/telemetry_context.rs | 18 +++++ foundations/src/telemetry/testing.rs | 3 + foundations/src/telemetry/tracing/mod.rs | 78 ++++++++++++++++++- 4 files changed, 104 insertions(+), 1 deletion(-) diff --git a/foundations/src/telemetry/mod.rs b/foundations/src/telemetry/mod.rs index 218b8e0..63c39ae 100644 --- a/foundations/src/telemetry/mod.rs +++ b/foundations/src/telemetry/mod.rs @@ -139,6 +139,9 @@ feature_use!(cfg(feature = "tracing"), { }); }); +#[cfg(feature = "user-tracing")] +use self::tracing::UserSpanScope; + #[cfg(all(feature = "user-tracing", feature = "testing"))] use self::tracing::testing::UserTestTracerScope; @@ -263,6 +266,9 @@ pub struct TelemetryScope { #[cfg(feature = "tracing")] _span_scope: Option, + #[cfg(feature = "user-tracing")] + _user_span_scope: Option, + // NOTE: certain tracing APIs start a new trace, so we need to scope the test tracer // for them to use the tracer from the test scope instead of production tracer in // the harness. diff --git a/foundations/src/telemetry/telemetry_context.rs b/foundations/src/telemetry/telemetry_context.rs index 5074d5c..75d7ccf 100644 --- a/foundations/src/telemetry/telemetry_context.rs +++ b/foundations/src/telemetry/telemetry_context.rs @@ -22,6 +22,9 @@ feature_use!(cfg(feature = "tracing"), { }); feature_use!(cfg(feature = "user-tracing"), { + use super::tracing::UserSpanScope; + use super::tracing::internal::current_user_span; + feature_use!(cfg(feature = "testing"), { use super::tracing::testing::{UserTestTracerScope, current_user_test_tracer}; }); @@ -104,6 +107,9 @@ pub struct TelemetryContext { #[cfg(feature = "tracing")] pub(super) span: Option, + #[cfg(feature = "user-tracing")] + pub(super) user_span: Option, + #[cfg(all(feature = "tracing", feature = "testing"))] pub(super) test_tracer: Option, @@ -121,6 +127,9 @@ impl TelemetryContext { #[cfg(feature = "tracing")] span: current_span(), + #[cfg(feature = "user-tracing")] + user_span: current_user_span(), + #[cfg(all(feature = "tracing", feature = "testing"))] test_tracer: current_test_tracer(), @@ -184,6 +193,9 @@ impl TelemetryContext { #[cfg(feature = "tracing")] _span_scope: self.span.as_ref().cloned().map(SpanScope::new), + #[cfg(feature = "user-tracing")] + _user_span_scope: self.user_span.as_ref().cloned().map(UserSpanScope::new), + #[cfg(all(feature = "tracing", feature = "testing"))] _test_tracer_scope: self.test_tracer.as_ref().cloned().map(TestTracerScope::new), @@ -402,6 +414,9 @@ impl TelemetryContext { span: Some(fork_trace(fork_name)), + #[cfg(feature = "user-tracing")] + user_span: self.user_span.clone(), + #[cfg(feature = "testing")] test_tracer: self.test_tracer.clone(), @@ -480,6 +495,9 @@ impl TelemetryContext { #[cfg(feature = "tracing")] span: self.span.clone(), + #[cfg(feature = "user-tracing")] + user_span: self.user_span.clone(), + #[cfg(all(feature = "tracing", feature = "testing"))] test_tracer: self.test_tracer.clone(), diff --git a/foundations/src/telemetry/testing.rs b/foundations/src/telemetry/testing.rs index 8eebd54..51a33bc 100644 --- a/foundations/src/telemetry/testing.rs +++ b/foundations/src/telemetry/testing.rs @@ -76,6 +76,9 @@ impl TestTelemetryContext { #[cfg(feature = "tracing")] span: None, + #[cfg(feature = "user-tracing")] + user_span: None, + #[cfg(feature = "tracing")] test_tracer: Some(tracer), diff --git a/foundations/src/telemetry/tracing/mod.rs b/foundations/src/telemetry/tracing/mod.rs index b359df5..023a4c0 100644 --- a/foundations/src/telemetry/tracing/mod.rs +++ b/foundations/src/telemetry/tracing/mod.rs @@ -158,6 +158,11 @@ pub use foundations_macros::span_fn; pub struct SpanScope { span: SharedSpan, _inner: Scope, + + #[cfg(feature = "user-tracing")] + user_span: Option, + #[cfg(feature = "user-tracing")] + _user_inner: Option>, } impl SpanScope { @@ -166,6 +171,11 @@ impl SpanScope { Self { span: span.clone(), _inner: Scope::new(&TracingHarness::get().span_scope_stack, span), + + #[cfg(feature = "user-tracing")] + user_span: None, + #[cfg(feature = "user-tracing")] + _user_inner: None, } } @@ -216,6 +226,11 @@ impl SpanScope { ctx.span = Some(self.span); + #[cfg(feature = "user-tracing")] + if let Some(user_span) = self.user_span { + ctx.user_span = Some(user_span); + } + ctx } } @@ -226,6 +241,7 @@ impl SpanScope { #[cfg(feature = "user-tracing")] #[must_use] pub struct UserSpanScope { + span: SharedSpan, _inner: Scope, } @@ -234,9 +250,18 @@ impl UserSpanScope { #[inline] pub(crate) fn new(span: SharedSpan) -> Self { Self { + span: span.clone(), _inner: Scope::new(&TracingHarness::get_user().span_scope_stack, span), } } + + /// Converts the user span scope to a [`TelemetryContext`] that can be applied to a future. + pub fn into_context(self) -> TelemetryContext { + let mut ctx = TelemetryContext::current(); + ctx.user_span = Some(self.span); + + ctx + } } /// Options for a new trace. @@ -1030,7 +1055,7 @@ pub use __test_trace as test_trace; #[cfg(all(test, feature = "user-tracing", feature = "testing"))] mod user_tracing_tests { use super::{ - add_user_span_log_fields, add_user_span_tags, set_user_span_finish_callback, + add_user_span_log_fields, add_user_span_tags, set_user_span_finish_callback, span, start_user_trace, test_trace, user_span, }; use crate::telemetry::TelemetryContext; @@ -1124,4 +1149,55 @@ mod user_tracing_tests { let traces = ctx.user_traces(opts); assert!(traces[0].0.tags.iter().any(|(k, _)| k == "finished")); } + + #[tokio::test] + async fn propagates_across_await() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let root_ctx = start_user_trace("request").into_context(); + root_ctx + .apply(async { + let _child = user_span("child"); + }) + .await; + } + + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "child" } }] + ); + } + + // The user span rides along on the ambient `TelemetryContext` even when propagation goes + // through an *internal* span's `into_context()` — no explicit user-span threading needed. + #[tokio::test] + async fn user_span_carried_by_internal_context() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request"); + + // Propagate via an internal span's context; never touch the user scope. + span("internal") + .into_context() + .apply(async { + let _user_child = user_span("user_child"); + }) + .await; + } + + // User pipeline: the user child nested under the user root (the user span was carried). + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "user_child" } }] + ); + // Internal pipeline: just the internal span. + assert_eq!( + ctx.traces(Default::default()), + vec![test_trace! { "internal" }] + ); + } } From e646eaee92ec3a7d194acadfbf03b1bd442ca948 Mon Sep 17 00:00:00 2001 From: Mar Witek Date: Wed, 24 Jun 2026 03:06:28 +0200 Subject: [PATCH 4/7] Add user-span ergonomic instrumentation Adds `SpanScope::with_user_span()` to open a parallel user span off an internal span (named after it), and a `user = true` option on `#[span_fn]` that does the same for whole functions (sync and async). Both are no-ops when no user trace is active. Covered by macro snapshot tests plus parallel and no-op runtime tests. --- foundations-macros/src/span_fn.rs | 84 +++++++++++++++++- foundations/src/telemetry/tracing/mod.rs | 104 ++++++++++++++++++++++- 2 files changed, 186 insertions(+), 2 deletions(-) diff --git a/foundations-macros/src/span_fn.rs b/foundations-macros/src/span_fn.rs index def9672..3c3b098 100644 --- a/foundations-macros/src/span_fn.rs +++ b/foundations-macros/src/span_fn.rs @@ -47,6 +47,9 @@ struct Options { #[darling(default = "Options::default_generic")] generic: bool, + + #[darling(default = "Options::default_user")] + user: bool, } impl Options { @@ -61,6 +64,10 @@ impl Options { fn default_generic() -> bool { cfg!(foundations_generic_telemetry_wrapper) } + + fn default_user() -> bool { + false + } } struct Args { @@ -119,9 +126,10 @@ fn expand_from_parsed(args: Args, item_fn: ItemFn) -> TokenStream2 { None => try_async_trait_fn_rewrite(&args, &block).unwrap_or_else(|| { let span_name = args.span_name.as_tokens(); let crate_path = &args.options.crate_path; + let user_span = with_user_span_call(&args.options); quote!( - let __span = #crate_path::telemetry::tracing::span(#span_name); + let __span = #crate_path::telemetry::tracing::span(#span_name)#user_span; #block ) }), @@ -191,15 +199,26 @@ fn wrap_with_span(args: &Args, block: TokenStream2) -> TokenStream2 { let span_name = args.span_name.as_tokens(); let crate_path = &args.options.crate_path; + let user_span = with_user_span_call(&args.options); quote!( #crate_path::telemetry::tracing::span(#span_name) + #user_span .into_context() .#apply_fn(#block) .await ) } +/// Emits `.with_user_span()` when `user = true`, otherwise nothing. +fn with_user_span_call(options: &Options) -> TokenStream2 { + if options.user { + quote!(.with_user_span()) + } else { + quote!() + } +} + #[cfg(test)] mod tests { use super::*; @@ -236,6 +255,36 @@ mod tests { assert_eq!(actual, expected); } + #[test] + fn expand_sync_fn_user() { + let args = parse_attr! { + #[span_fn("sync_span", user = true)] + }; + + let item_fn = parse_quote! { + fn do_sync() -> io::Result { + do_something_else(); + + Ok("foo".into()) + } + }; + + let actual = expand_from_parsed(args, item_fn).to_string(); + + let expected = code_str! { + fn do_sync<>() -> io::Result { + let __span = ::foundations::telemetry::tracing::span("sync_span").with_user_span(); + { + do_something_else(); + + Ok("foo".into()) + } + } + }; + + assert_eq!(actual, expected); + } + #[test] fn expand_sync_fn_const_span_name() { let args = parse_attr! { @@ -298,6 +347,39 @@ mod tests { assert_eq!(actual, expected); } + #[test] + fn expand_async_fn_user() { + let args = parse_attr! { + #[span_fn("async_span", user = true)] + }; + + let item_fn = parse_quote! { + async fn do_async() -> io::Result { + do_something_else().await; + + Ok("foo".into()) + } + }; + + let actual = expand_from_parsed(args, item_fn).to_string(); + + let expected = code_str! { + async fn do_async<>() -> io::Result { + ::foundations::telemetry::tracing::span("async_span") + .with_user_span() + .into_context() + .apply(async move {{ + do_something_else().await; + + Ok("foo".into()) + }}) + .await + } + }; + + assert_eq!(actual, expected); + } + #[test] fn expand_async_fn_local() { let args = parse_attr! { diff --git a/foundations/src/telemetry/tracing/mod.rs b/foundations/src/telemetry/tracing/mod.rs index 023a4c0..7e49e8d 100644 --- a/foundations/src/telemetry/tracing/mod.rs +++ b/foundations/src/telemetry/tracing/mod.rs @@ -21,9 +21,11 @@ mod output_otlp_grpc; use self::init::TracingHarness; use self::internal::{SharedSpan, create_span, current_span, shared_span, span_trace_id}; #[cfg(feature = "user-tracing")] -use self::internal::{create_user_span, user_shared_span}; +use self::internal::{create_user_span, current_user_span, user_shared_span}; use super::TelemetryContext; use super::scope::Scope; +#[cfg(feature = "user-tracing")] +use cf_rustracing::span::InspectableSpan; use std::borrow::Cow; use std::sync::Arc; @@ -179,6 +181,27 @@ impl SpanScope { } } + /// Opens a parallel user span (child of the current user span, named after this span) when a + /// user trace is active; otherwise a no-op. The user span shares this scope's lifetime. + #[cfg(feature = "user-tracing")] + pub fn with_user_span(mut self) -> Self { + if current_user_span().is_some() { + let name = self + .span + .inner + .with_read(|s| s.operation_name().to_string()); + let user_span = create_user_span(name); + + self._user_inner = Some(Scope::new( + &TracingHarness::get_user().span_scope_stack, + user_span.clone(), + )); + self.user_span = Some(user_span); + } + + self + } + /// Converts the span scope to [`TelemetryContext`] that can be a applied to a future. /// /// This is effectively a shorthand for calling [`TelemetryContext::current`] with the span @@ -1116,6 +1139,25 @@ mod user_tracing_tests { ); } + #[test] + fn with_user_span_is_parallel() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request"); + let _s = span("op").with_user_span(); + } + + // Internal pipeline: just the internal span. + assert_eq!(ctx.traces(Default::default()), vec![test_trace! { "op" }]); + // User pipeline: the parallel user span nested under the user root. + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "op" } }] + ); + } + #[test] fn no_op_without_activation() { let ctx = TelemetryContext::test(); @@ -1200,4 +1242,64 @@ mod user_tracing_tests { vec![test_trace! { "internal" }] ); } + + // Same property via the `#[span_fn]` macro path (a plain internal-traced async fn). + #[crate::telemetry::tracing::span_fn("internal_fn", crate_path = "crate")] + async fn internal_fn() { + let _user_child = user_span("user_child"); + } + + #[tokio::test] + async fn user_span_carried_by_span_fn() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request"); + internal_fn().await; + } + + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "user_child" } }] + ); + assert_eq!( + ctx.traces(Default::default()), + vec![test_trace! { "internal_fn" }] + ); + } + + // `with_user_span()` is a no-op when no user trace is active: the internal span is still + // created, but no parallel user span is produced. + #[test] + fn with_user_span_no_op_when_inactive() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + // No `start_user_trace` => user tracing not active for this scope. + let _s = span("op").with_user_span(); + } + + assert_eq!(ctx.traces(Default::default()), vec![test_trace! { "op" }]); + assert!(ctx.user_traces(Default::default()).is_empty()); + } + + #[crate::telemetry::tracing::span_fn("user_fn", user = true, crate_path = "crate")] + async fn user_fn() {} + + // `#[span_fn(user = true)]` is likewise a no-op for the user pipeline when inactive. + #[tokio::test] + async fn span_fn_user_no_op_when_inactive() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + user_fn().await; + + assert_eq!( + ctx.traces(Default::default()), + vec![test_trace! { "user_fn" }] + ); + assert!(ctx.user_traces(Default::default()).is_empty()); + } } From cc6279b1e5846fb2ddd0868d97daf8efb23e7e8e Mon Sep 17 00:00:00 2001 From: Mar Witek Date: Wed, 24 Jun 2026 03:09:40 +0200 Subject: [PATCH 5/7] Temporarily patch cf-rustracing to the routing branch Points `cf-rustracing` at the fork branch that adds `RoutingMetadata` as a span property, needed by the user-tracing exporter and `start_user_trace` routing. Placeholder to be replaced by a normal version bump once the rustracing change is released. --- Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 7c7c1ec..ea03e74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,3 +120,7 @@ tower = "0.5.3" # Only used in deprecated `foundations` code # TODO: remove before next major release sentry-core = { version = "0.36.0", default-features = false } + +# TEMPORARY +[patch.crates-io] +cf-rustracing = { git = "https://github.com/mar-cf/rustracing.git", branch = "user-tracing" } From 1624510bbb57905b4a47b205fb82da4de1a3e2ef Mon Sep 17 00:00:00 2001 From: Mar Witek Date: Wed, 24 Jun 2026 03:20:35 +0200 Subject: [PATCH 6/7] Add user-tracing pipeline and span routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the per-process user pipeline — `UserTracingSettings`, `init_user`/`USER_HARNESS`, and the OTLP-over-UDS exporter that encodes `RoutingMetadata` into the `cf-trace-config` header — wired into `telemetry::init`. `start_user_trace` now takes a required `RoutingMetadata` attached at span construction and inherited by descendants (the exporter drops routing-less spans). Verified end-to-end by producer tests that decode the exported OTLP body. --- Cargo.toml | 1 + foundations/Cargo.toml | 12 +- foundations/src/telemetry/mod.rs | 15 +- foundations/src/telemetry/settings/mod.rs | 10 + .../src/telemetry/settings/user_tracing.rs | 110 ++++ foundations/src/telemetry/tracing/init.rs | 63 ++ foundations/src/telemetry/tracing/internal.rs | 12 +- foundations/src/telemetry/tracing/mod.rs | 46 +- .../src/telemetry/tracing/output_otlp_uds.rs | 552 ++++++++++++++++++ 9 files changed, 803 insertions(+), 18 deletions(-) create mode 100644 foundations/src/telemetry/settings/user_tracing.rs create mode 100644 foundations/src/telemetry/tracing/output_otlp_uds.rs diff --git a/Cargo.toml b/Cargo.toml index ea03e74..4d0e797 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ proc-macro2 = { version = "1.0.106", default-features = false } prometheus = { version = "0.14.0", default-features = false } prometheus-client = "0.18.1" prometools = "0.2.3" +prost = "0.14" rand = "0.10.1" percent-encoding = "2.3.2" quote = "1.0.45" diff --git a/foundations/Cargo.toml b/foundations/Cargo.toml index 3733adf..4511d71 100644 --- a/foundations/Cargo.toml +++ b/foundations/Cargo.toml @@ -149,7 +149,16 @@ tracing = [ ] # Enables distributed user tracing functionality. -user-tracing = ["tracing"] +user-tracing = [ + "tracing", + "dep:hyper", + "hyper/client", + "dep:hyper-util", + "dep:http-body-util", + "dep:serde_json", + "dep:prost", + "tokio/net", +] # Enables metrics functionality. metrics = [ @@ -240,6 +249,7 @@ percent-encoding = { workspace = true, optional = true } prometheus = { workspace = true, optional = true, features = ["process"] } prometheus-client = { workspace = true, optional = true } prometools = { workspace = true, optional = true, features = ["serde"] } +prost = { workspace = true, optional = true } rand = { workspace = true, optional = true } serde = { workspace = true, optional = true, features = ["derive", "rc"] } serde_json = { workspace = true, optional = true } diff --git a/foundations/src/telemetry/mod.rs b/foundations/src/telemetry/mod.rs index 63c39ae..9217d0d 100644 --- a/foundations/src/telemetry/mod.rs +++ b/foundations/src/telemetry/mod.rs @@ -73,7 +73,10 @@ mod scope; mod telemetry_context; -#[cfg(all(feature = "tracing", feature = "telemetry-otlp-grpc"))] +#[cfg(all( + feature = "tracing", + any(feature = "telemetry-otlp-grpc", feature = "user-tracing") +))] mod otlp_conversion; #[cfg(feature = "testing")] @@ -359,6 +362,16 @@ pub fn init(config: TelemetryConfig) -> BootstrapResult { } } + #[cfg(feature = "user-tracing")] + { + if let Some(user_settings) = &config.settings.user_tracing { + let initializer = self::tracing::init::init_user(config.service_info, user_settings)?; + if let Some(fut) = initializer { + tele_futures.push(fut); + } + } + } + TELEMETRY_INITIALIZED.store(true, Ordering::Relaxed); #[cfg(feature = "telemetry-server")] diff --git a/foundations/src/telemetry/settings/mod.rs b/foundations/src/telemetry/settings/mod.rs index fd52b9c..4d77c22 100644 --- a/foundations/src/telemetry/settings/mod.rs +++ b/foundations/src/telemetry/settings/mod.rs @@ -6,6 +6,9 @@ mod otlp_grpc_output; #[cfg(feature = "tracing")] mod tracing; +#[cfg(feature = "user-tracing")] +mod user_tracing; + #[cfg(feature = "logging")] mod logging; @@ -27,6 +30,9 @@ pub use self::otlp_grpc_output::*; #[cfg(feature = "tracing")] pub use self::tracing::*; +#[cfg(feature = "user-tracing")] +pub use self::user_tracing::*; + #[cfg(feature = "logging")] pub use self::logging::*; @@ -53,6 +59,10 @@ pub struct TelemetrySettings { #[cfg(feature = "tracing")] pub tracing: TracingSettings, + /// Distributed user tracing settings + #[cfg(feature = "user-tracing")] + pub user_tracing: Option, + /// Logging settings. #[cfg(feature = "logging")] pub logging: LoggingSettings, diff --git a/foundations/src/telemetry/settings/user_tracing.rs b/foundations/src/telemetry/settings/user_tracing.rs new file mode 100644 index 0000000..0437f37 --- /dev/null +++ b/foundations/src/telemetry/settings/user_tracing.rs @@ -0,0 +1,110 @@ +use std::num::NonZeroUsize; + +#[cfg(feature = "settings")] +use crate::settings::settings; + +/// Distributed user tracing settings. +#[cfg_attr(feature = "settings", settings(crate_path = "crate"))] +#[cfg_attr(not(feature = "settings"), derive(Clone, Debug, serde::Deserialize))] +pub struct UserTracingSettings { + /// Enables user tracing. + #[serde(default = "UserTracingSettings::default_enabled")] + pub enabled: bool, + + /// Maximum number of spans to buffer for output. Any spans above + /// this limit will be dropped until the queue regains capacity. + /// + /// The default is to buffer up to 1 million spans in memory. This protects + /// services from out-of-memory errors when the output gets heavily backed up. + /// To disable the limit entirely, set this setting to `None`. + #[serde(default = "UserTracingSettings::default_max_queue_size")] + pub max_queue_size: Option, + + /// The output for the collected user traces. + pub output: UserTracesOutput, +} + +#[cfg(not(feature = "settings"))] +impl Default for UserTracingSettings { + fn default() -> Self { + Self { + enabled: UserTracingSettings::default_enabled(), + max_queue_size: UserTracingSettings::default_max_queue_size(), + output: Default::default(), + } + } +} + +impl UserTracingSettings { + fn default_enabled() -> bool { + true + } + + const fn default_max_queue_size() -> Option { + Some(const { NonZeroUsize::new(1_000_000).expect("1_000_000 is not zero") }) + } +} + +/// The output for the collected user traces. +#[cfg_attr( + feature = "settings", + settings(crate_path = "crate", impl_default = false) +)] +#[cfg_attr(not(feature = "settings"), derive(Clone, Debug, serde::Deserialize))] +pub enum UserTracesOutput { + /// Send user tracing spans as OTLP over a Unix domain socket to an OTLP endpoint. + OtlpUds(OtlpUdsOutputSettings), +} + +impl Default for UserTracesOutput { + fn default() -> Self { + Self::OtlpUds(Default::default()) + } +} + +/// [OTLP over UDS] output settings for user tracing. +/// +/// Sends trace data as protobuf-encoded OTLP over HTTP to a Unix domain socket. +#[cfg_attr(feature = "settings", settings(crate_path = "crate"))] +#[cfg_attr(not(feature = "settings"), derive(Clone, Debug, serde::Deserialize))] +pub struct OtlpUdsOutputSettings { + /// Path to the Unix domain socket for the OTLP endpoint. + pub socket_path: String, + + /// Number of concurrent worker tasks for user trace export. + /// + /// # Default + /// + /// Default value is `2`. + #[serde(default = "OtlpUdsOutputSettings::default_num_tasks")] + pub num_tasks: usize, + + /// Maximum number of spans to drain per batch. + /// + /// # Default + /// + /// Default value is `512`. + #[serde(default = "OtlpUdsOutputSettings::default_max_batch_size")] + pub max_batch_size: usize, +} + +#[cfg(not(feature = "settings"))] +impl Default for OtlpUdsOutputSettings { + fn default() -> Self { + Self { + socket_path: String::new(), + num_tasks: OtlpUdsOutputSettings::default_num_tasks(), + max_batch_size: OtlpUdsOutputSettings::default_max_batch_size(), + } + } +} + +impl OtlpUdsOutputSettings { + const fn default_num_tasks() -> usize { + 2 + } + + const fn default_max_batch_size() -> usize { + 512 + } +} diff --git a/foundations/src/telemetry/tracing/init.rs b/foundations/src/telemetry/tracing/init.rs index 152e9a9..e76b86e 100644 --- a/foundations/src/telemetry/tracing/init.rs +++ b/foundations/src/telemetry/tracing/init.rs @@ -14,6 +14,13 @@ use std::sync::{LazyLock, OnceLock}; #[cfg(feature = "telemetry-otlp-grpc")] use super::output_otlp_grpc; +#[cfg(feature = "user-tracing")] +use super::output_otlp_uds; +#[cfg(feature = "user-tracing")] +use crate::telemetry::settings::{UserTracesOutput, UserTracingSettings}; +#[cfg(feature = "user-tracing")] +use cf_rustracing::sampler::AllSampler; + #[cfg(feature = "testing")] use std::borrow::Cow; @@ -167,3 +174,59 @@ pub(crate) fn init( }); res } + +#[cfg(feature = "user-tracing")] +fn create_tracer_and_span_rx_for_user( + settings: &UserTracingSettings, +) -> BootstrapResult<(Tracer, SharedSpanReceiver)> { + // The user pipeline samples everything it receives — the sampling decision + // is made upstream at activation time, outside foundations. + let sampler = AllSampler.boxed(); + + if let Some(cap) = settings.max_queue_size { + let (consumer, span_rx) = super::channel::channel(cap); + Ok((Tracer::with_consumer(sampler, consumer), span_rx)) + } else { + let (consumer, span_rx) = super::channel::unbounded_channel(); + Ok((Tracer::with_consumer(sampler, consumer), span_rx)) + } +} + +// NOTE: does nothing if user tracing has already been initialized in this process. +#[cfg(feature = "user-tracing")] +pub(crate) fn init_user( + service_info: &ServiceInfo, + settings: &UserTracingSettings, +) -> BootstrapResult>>> { + if !settings.enabled || USER_HARNESS.get().is_some() { + return Ok(None); + } + + let (tracer, span_rx) = create_tracer_and_span_rx_for_user(settings)?; + + let futs = match &settings.output { + UserTracesOutput::OtlpUds(output_settings) => { + output_otlp_uds::start(service_info, output_settings, span_rx)? + } + }; + + // Only spawn the futures if we are actually initializing the harness. + let mut res = Ok(None); + USER_HARNESS.get_or_init(|| { + res = Ok(futs.initializer); + for f in futs.workers { + tokio::spawn(f); + } + + TracingHarness { + tracer, + span_scope_stack: Default::default(), + + #[cfg(feature = "testing")] + test_tracer_scope_stack: Default::default(), + + active_roots: Default::default(), + } + }); + res +} diff --git a/foundations/src/telemetry/tracing/internal.rs b/foundations/src/telemetry/tracing/internal.rs index db2b237..8eaaee0 100644 --- a/foundations/src/telemetry/tracing/internal.rs +++ b/foundations/src/telemetry/tracing/internal.rs @@ -3,6 +3,8 @@ use super::init::TracingHarness; use crate::telemetry::tracing::live::LiveReferenceHandle; use cf_rustracing::sampler::BoxSampler; +#[cfg(feature = "user-tracing")] +use cf_rustracing::span::RoutingMetadata; use cf_rustracing::tag::Tag; use cf_rustracing_jaeger::span::{Span, SpanContext, SpanContextState}; use parking_lot::RwLock; @@ -180,11 +182,15 @@ pub fn write_current_user_span(write_fn: impl FnOnce(&mut Span)) { write_fn(&mut span_guard); } -/// Starts a root user span on the user harness. +/// Starts a root user span on the user harness. `routing` is set at construction and inherited +/// by child spans. #[cfg(feature = "user-tracing")] -pub(crate) fn start_user_trace(name: impl Into>) -> Span { +pub(crate) fn start_user_trace( + name: impl Into>, + routing: RoutingMetadata, +) -> Span { let tracer = TracingHarness::get_user().tracer(); - tracer.span(name).start() + tracer.span(name).routing(routing).start() } pub(super) fn reporter_error(err: impl Error) { diff --git a/foundations/src/telemetry/tracing/mod.rs b/foundations/src/telemetry/tracing/mod.rs index 7e49e8d..9a9bea0 100644 --- a/foundations/src/telemetry/tracing/mod.rs +++ b/foundations/src/telemetry/tracing/mod.rs @@ -18,6 +18,9 @@ mod rate_limit; #[cfg(feature = "telemetry-otlp-grpc")] mod output_otlp_grpc; +#[cfg(feature = "user-tracing")] +mod output_otlp_uds; + use self::init::TracingHarness; use self::internal::{SharedSpan, create_span, current_span, shared_span, span_trace_id}; #[cfg(feature = "user-tracing")] @@ -35,6 +38,9 @@ pub use self::testing::{TestSpan, TestTrace, TestTraceIterator, TestTraceOptions pub use cf_rustracing::tag::TagValue; pub use cf_rustracing_jaeger::span::{Span, SpanContextState as SerializableTraceState, TraceId}; +#[cfg(feature = "user-tracing")] +pub use cf_rustracing::span::RoutingMetadata; + /// Returns active traces as a JSON dump. /// /// The model for this functionality is @@ -485,12 +491,16 @@ pub fn start_trace( SpanScope::new(shared_span(internal::start_trace(root_span_name, options))) } -/// Starts a root user span (per-request activation). +/// Starts a root user span (per-request activation). `routing` is attached at construction and +/// inherited by child spans. /// -/// Without an active root, `user_span` / `add_user_span_tags!` are no-ops. +/// Without an active root, `user_span` / `with_user_span` / `add_user_span_tags!` are no-ops. #[cfg(feature = "user-tracing")] -pub fn start_user_trace(name: impl Into>) -> UserSpanScope { - UserSpanScope::new(user_shared_span(internal::start_user_trace(name))) +pub fn start_user_trace( + name: impl Into>, + routing: RoutingMetadata, +) -> UserSpanScope { + UserSpanScope::new(user_shared_span(internal::start_user_trace(name, routing))) } /// Creates a user span as a child of the current user span, or inactive when no user trace is @@ -1078,20 +1088,30 @@ pub use __test_trace as test_trace; #[cfg(all(test, feature = "user-tracing", feature = "testing"))] mod user_tracing_tests { use super::{ - add_user_span_log_fields, add_user_span_tags, set_user_span_finish_callback, span, - start_user_trace, test_trace, user_span, + RoutingMetadata, add_user_span_log_fields, add_user_span_tags, + set_user_span_finish_callback, span, start_user_trace, test_trace, user_span, }; use crate::telemetry::TelemetryContext; use crate::telemetry::tracing::{Span, TestTraceOptions}; use cf_rustracing::tag::{Tag, TagValue}; + fn routing() -> RoutingMetadata { + RoutingMetadata { + zone_id: 1, + account_id: 2, + workspace_id: "ws".to_string(), + destinations: vec![], + managed: false, + } + } + #[test] fn creation_and_nesting() { let ctx = TelemetryContext::test(); let _scope = ctx.scope(); { - let _root = start_user_trace("request"); + let _root = start_user_trace("request", routing()); let _child = user_span("child"); let _grandchild = user_span("grandchild"); } @@ -1116,7 +1136,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request"); + let _root = start_user_trace("request", routing()); add_user_span_tags!("cache.status" => "HIT"); add_user_span_log_fields!("event" => "lookup"); } @@ -1145,7 +1165,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request"); + let _root = start_user_trace("request", routing()); let _s = span("op").with_user_span(); } @@ -1178,7 +1198,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request"); + let _root = start_user_trace("request", routing()); set_user_span_finish_callback!(|span: &mut Span| { span.set_tag(|| Tag::new("finished", true)); }); @@ -1198,7 +1218,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let root_ctx = start_user_trace("request").into_context(); + let root_ctx = start_user_trace("request", routing()).into_context(); root_ctx .apply(async { let _child = user_span("child"); @@ -1220,7 +1240,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request"); + let _root = start_user_trace("request", routing()); // Propagate via an internal span's context; never touch the user scope. span("internal") @@ -1255,7 +1275,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request"); + let _root = start_user_trace("request", routing()); internal_fn().await; } diff --git a/foundations/src/telemetry/tracing/output_otlp_uds.rs b/foundations/src/telemetry/tracing/output_otlp_uds.rs new file mode 100644 index 0000000..1daae7f --- /dev/null +++ b/foundations/src/telemetry/tracing/output_otlp_uds.rs @@ -0,0 +1,552 @@ +//! [OTLP-over-UDS] output for the user tracing pipeline. +//! +//! Sends protobuf-encoded OTLP trace data over HTTP/1.1 to a Unix domain +//! socket served by a local OTLP endpoint. + +use super::channel::SharedSpanReceiver; +use super::init::TraceOutputFutures; +use super::internal::reporter_error; +use crate::telemetry::otlp_conversion::tracing::convert_span; +use crate::telemetry::settings::OtlpUdsOutputSettings; +use crate::{BootstrapResult, ServiceInfo}; +use anyhow::ensure; +use cf_rustracing::span::RoutingMetadata; +use cf_rustracing_jaeger::span::FinishedSpan; +use futures_util::future::FutureExt as _; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::header::{CONTENT_TYPE, HOST}; +use hyper::{Method, Request, StatusCode}; +use hyper_util::rt::TokioIo; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::trace::v1::ResourceSpans; +use prost::Message as _; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::net::UnixStream; + +const TRACES_PATH: &str = "/v1/traces"; +const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf"; +const TRACE_CONFIG_HEADER: &str = "cf-trace-config"; +const HOST_HEADER_VALUE: &str = "localhost"; + +/// A failure exporting a single OTLP request over the Unix domain socket. +/// +/// This is the concrete error type for the UDS client, mirroring how the +/// existing exporters surface a concrete library error (`cf_rustracing::Error`, +/// `tonic::Status`) to [`reporter_error`]. +#[derive(Debug)] +enum OtlpUdsExportError { + Connect(std::io::Error), + Handshake(hyper::Error), + BuildRequest(http::Error), + Send(hyper::Error), + Status(StatusCode), +} + +impl std::fmt::Display for OtlpUdsExportError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Connect(err) => write!(f, "failed to connect to UDS socket: {err}"), + Self::Handshake(err) => write!(f, "HTTP/1 handshake over UDS failed: {err}"), + Self::BuildRequest(err) => write!(f, "failed to build OTLP UDS request: {err}"), + Self::Send(err) => write!(f, "failed to send OTLP UDS request: {err}"), + Self::Status(status) => { + write!(f, "OTLP UDS receptor returned non-success status: {status}") + } + } + } +} + +impl std::error::Error for OtlpUdsExportError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Connect(err) => Some(err), + Self::Handshake(err) => Some(err), + Self::BuildRequest(err) => Some(err), + Self::Send(err) => Some(err), + Self::Status(_) => None, + } + } +} + +/// Encodes a span's [`RoutingMetadata`] into the `cf-trace-config` header value. +/// +/// This is foundations' contract with the receptor, so the wire shape lives here +/// (not on the `RoutingMetadata` type) and can evolve independently. It's a +/// simple JSON object of the routing fields. +fn encode_trace_config(routing: &RoutingMetadata) -> String { + serde_json::json!({ + "zoneId": routing.zone_id, + "accountId": routing.account_id, + "workspaceId": routing.workspace_id, + "destinations": routing.destinations, + "managed": routing.managed, + }) + .to_string() +} + +/// Exports user tracing spans as OTLP over a Unix domain socket. +#[derive(Debug)] +pub(super) struct OtlpUdsClient { + socket_path: String, +} + +impl OtlpUdsClient { + pub(super) fn new(settings: &OtlpUdsOutputSettings) -> BootstrapResult { + ensure!( + !settings.socket_path.is_empty(), + "user tracing OTLP UDS `socket_path` must be set" + ); + + Ok(Self { + socket_path: settings.socket_path.clone(), + }) + } + + /// Processes a single drained batch of spans: groups them by zone, converts + /// each to OTLP, and POSTs one request per zone. Errors are reported and do + /// not abort the batch. + async fn process_batch(&self, service_info: &ServiceInfo, spans: Vec) { + // Group spans by zone so each request carries a single zone's routing + // metadata in its `cf-trace-config` header. + let mut groups: HashMap)> = HashMap::new(); + + for span in spans { + // Spans without routing metadata aren't user-traced spans we can + // route, so drop them. Read routing before `convert_span` consumes + // the span. + let Some(routing) = span.routing().cloned() else { + continue; + }; + let zone_id = routing.zone_id; + let resource_spans = convert_span(span, service_info); + + groups + .entry(zone_id) + .or_insert_with(|| (routing, Vec::new())) + .1 + .push(resource_spans); + } + + for (_zone_id, (routing, resource_spans)) in groups { + let body = ExportTraceServiceRequest { resource_spans }.encode_to_vec(); + let trace_config = encode_trace_config(&routing); + + if let Err(err) = self.send(body, trace_config).await { + reporter_error(err); + } + } + } + + /// POSTs a single OTLP request body to the receptor, tagged with the + /// per-zone `cf-trace-config` header. + async fn send(&self, body: Vec, trace_config: String) -> Result<(), OtlpUdsExportError> { + let stream = UnixStream::connect(&self.socket_path) + .await + .map_err(OtlpUdsExportError::Connect)?; + + let (mut send_request, conn) = hyper::client::conn::http1::handshake(TokioIo::new(stream)) + .await + .map_err(OtlpUdsExportError::Handshake)?; + + // Drive the connection in the background; request/response errors are + // surfaced via `send_request` below, and the driver completes once the + // exchange is done. + tokio::spawn(async move { + let _ = conn.await; + }); + + let request = Request::builder() + .method(Method::POST) + .uri(TRACES_PATH) + .header(HOST, HOST_HEADER_VALUE) + .header(CONTENT_TYPE, CONTENT_TYPE_PROTOBUF) + .header(TRACE_CONFIG_HEADER, trace_config) + .body(Full::new(Bytes::from(body))) + .map_err(OtlpUdsExportError::BuildRequest)?; + + let response = send_request + .send_request(request) + .await + .map_err(OtlpUdsExportError::Send)?; + + let status = response.status(); + if !status.is_success() { + return Err(OtlpUdsExportError::Status(status)); + } + + Ok(()) + } +} + +pub(super) fn start( + service_info: &ServiceInfo, + settings: &OtlpUdsOutputSettings, + span_rx: SharedSpanReceiver, +) -> BootstrapResult { + let client = Arc::new(OtlpUdsClient::new(settings)?); + let max_batch_size = settings.max_batch_size; + + let workers = (0..settings.num_tasks) + .map(|_| { + let client = Arc::clone(&client); + let service_info = service_info.clone(); + let span_rx = span_rx.clone(); + + async move { do_export(client, service_info, span_rx, max_batch_size).await }.boxed() + }) + .collect(); + + Ok(TraceOutputFutures { + initializer: None, + workers, + }) +} + +/// Drains the span channel and hands each batch to the client for export. +async fn do_export( + client: Arc, + service_info: ServiceInfo, + span_rx: SharedSpanReceiver, + max_batch_size: usize, +) { + let mut batch = Vec::with_capacity(max_batch_size); + + while span_rx.recv_many(&mut batch, max_batch_size).await > 0 { + client + .process_batch(&service_info, std::mem::take(&mut batch)) + .await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http_body_util::BodyExt as _; + use hyper::Response; + use hyper::body::Incoming; + use hyper::service::service_fn; + use std::convert::Infallible; + use std::path::{Path, PathBuf}; + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio::sync::mpsc; + + struct CapturedRequest { + method: String, + path: String, + host: Option, + content_type: Option, + trace_config: Option, + body: Vec, + } + + /// Binds a UDS "receptor" that captures the first request it receives and + /// replies with `status`. The returned `TempDir` must be kept alive for the + /// socket file to remain valid. + fn spawn_receptor( + status: StatusCode, + ) -> (PathBuf, TempDir, mpsc::UnboundedReceiver) { + let dir = tempfile::tempdir().unwrap(); + let socket_path = dir.path().join("otlp.sock"); + let listener = UnixListener::bind(&socket_path).unwrap(); + + let (tx, rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + + let service = service_fn(move |req: Request| { + let tx = tx.clone(); + + async move { + let (parts, body) = req.into_parts(); + let headers = &parts.headers; + let get = |name: &str| { + headers + .get(name) + .and_then(|v| v.to_str().ok()) + .map(str::to_owned) + }; + + let captured = CapturedRequest { + method: parts.method.to_string(), + path: parts.uri.path().to_string(), + host: get("host"), + content_type: get("content-type"), + trace_config: get(TRACE_CONFIG_HEADER), + body: body.collect().await.unwrap().to_bytes().to_vec(), + }; + + tx.send(captured).ok(); + + Ok::<_, Infallible>( + Response::builder() + .status(status) + .body(Full::new(Bytes::new())) + .unwrap(), + ) + } + }); + + hyper::server::conn::http1::Builder::new() + .serve_connection(TokioIo::new(stream), service) + .await + .ok(); + }); + + (socket_path, dir, rx) + } + + fn settings_for(socket_path: &Path) -> OtlpUdsOutputSettings { + OtlpUdsOutputSettings { + socket_path: socket_path.to_string_lossy().into_owned(), + num_tasks: 1, + max_batch_size: 8, + } + } + + #[tokio::test] + async fn new_rejects_empty_socket_path() { + let err = OtlpUdsClient::new(&OtlpUdsOutputSettings { + socket_path: String::new(), + num_tasks: 1, + max_batch_size: 8, + }) + .unwrap_err(); + + assert!(err.to_string().contains("socket_path")); + } + + #[tokio::test] + async fn send_posts_otlp_with_headers_and_body() { + let (socket_path, _dir, mut rx) = spawn_receptor(StatusCode::OK); + + let client = OtlpUdsClient::new(&settings_for(&socket_path)).unwrap(); + + let body = b"hello-otlp".to_vec(); + let trace_config = r#"{"zone_id":"z1"}"#.to_string(); + + client + .send(body.clone(), trace_config.clone()) + .await + .unwrap(); + + let captured = rx.recv().await.unwrap(); + assert_eq!(captured.method, "POST"); + assert_eq!(captured.path, TRACES_PATH); + assert_eq!(captured.host.as_deref(), Some(HOST_HEADER_VALUE)); + assert_eq!( + captured.content_type.as_deref(), + Some(CONTENT_TYPE_PROTOBUF) + ); + assert_eq!( + captured.trace_config.as_deref(), + Some(trace_config.as_str()) + ); + assert_eq!(captured.body, body); + } + + #[tokio::test] + async fn send_errors_on_non_success_status() { + let (socket_path, _dir, _rx) = spawn_receptor(StatusCode::INTERNAL_SERVER_ERROR); + + let client = OtlpUdsClient::new(&settings_for(&socket_path)).unwrap(); + + let err = client + .send(b"x".to_vec(), "{}".to_string()) + .await + .unwrap_err(); + + assert!(matches!( + err, + OtlpUdsExportError::Status(StatusCode::INTERNAL_SERVER_ERROR) + )); + assert!(err.to_string().contains("non-success status")); + } + + // Drives the full path: a span produced through a tracer (with routing set) + // flows through the channel, is converted + encoded by `process_batch`, and + // arrives at the receptor with its routing in the `cf-trace-config` header. + #[tokio::test] + async fn process_batch_sends_converted_spans() { + use super::super::channel::unbounded_channel; + use cf_rustracing::Tracer; + use cf_rustracing::sampler::AllSampler; + + let (socket_path, _dir, mut rx) = spawn_receptor(StatusCode::OK); + + let (sender, span_rx) = unbounded_channel(); + + // Produce one finished span with routing, then drop the tracer so the + // channel closes and the worker loop terminates after draining. + { + let tracer = Tracer::with_consumer(AllSampler, sender); + let _span = tracer + .span("user-root") + .routing(RoutingMetadata { + zone_id: 12345, + account_id: 42, + workspace_id: "ws-1".to_string(), + destinations: vec!["dest-a".to_string()], + managed: true, + }) + .start(); + } + + let service_info = crate::service_info!(); + let futs = start(&service_info, &settings_for(&socket_path), span_rx).unwrap(); + for worker in futs.workers { + tokio::spawn(worker); + } + + let captured = rx.recv().await.unwrap(); + assert_eq!(captured.method, "POST"); + assert_eq!(captured.path, TRACES_PATH); + assert_eq!( + captured.content_type.as_deref(), + Some(CONTENT_TYPE_PROTOBUF) + ); + let trace_config: serde_json::Value = + serde_json::from_str(captured.trace_config.as_deref().unwrap()).unwrap(); + assert_eq!(trace_config["zoneId"], 12345); + assert_eq!(trace_config["accountId"], 42); + assert_eq!(trace_config["workspaceId"], "ws-1"); + assert_eq!(trace_config["destinations"], serde_json::json!(["dest-a"])); + assert_eq!(trace_config["managed"], true); + // Body is a protobuf-encoded `ExportTraceServiceRequest`. + assert!(!captured.body.is_empty()); + } + + // Full producer path: `init_user` stands up `USER_HARNESS` + the OTLP/UDS exporter, then + // `start_user_trace` + `user_span` + `add_user_span_tags!` produce spans that reach the + // receptor with routing in the `cf-trace-config` header. (nextest isolates this in its own + // process, so the one-shot `USER_HARNESS` is fine.) + #[tokio::test] + async fn user_pipeline_exports_with_routing() { + use crate::telemetry::settings::{UserTracesOutput, UserTracingSettings}; + use crate::telemetry::tracing::{add_user_span_tags, start_user_trace, user_span}; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + use prost::Message as _; + + let (socket_path, _dir, mut rx) = spawn_receptor(StatusCode::OK); + + let settings = UserTracingSettings { + enabled: true, + max_queue_size: None, + output: UserTracesOutput::OtlpUds(settings_for(&socket_path)), + }; + + let service_info = crate::service_info!(); + crate::telemetry::tracing::init::init_user(&service_info, &settings).unwrap(); + + { + let _root = start_user_trace( + "request", + RoutingMetadata { + zone_id: 12345, + account_id: 42, + workspace_id: "ws-1".to_string(), + destinations: vec!["dest-a".to_string()], + managed: true, + }, + ); + + let _child = user_span("child"); + add_user_span_tags!("cache.status" => "HIT"); + } + + let captured = rx.recv().await.unwrap(); + assert_eq!(captured.path, TRACES_PATH); + + let trace_config: serde_json::Value = + serde_json::from_str(captured.trace_config.as_deref().unwrap()).unwrap(); + assert_eq!(trace_config["zoneId"], 12345); + assert_eq!(trace_config["accountId"], 42); + assert_eq!(trace_config["workspaceId"], "ws-1"); + assert_eq!(trace_config["destinations"], serde_json::json!(["dest-a"])); + assert_eq!(trace_config["managed"], true); + + // Decode the OTLP body and verify the producer API actually emitted the expected spans. + let req = ExportTraceServiceRequest::decode(captured.body.as_slice()).unwrap(); + let spans: Vec<_> = req + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .collect(); + + let root = spans + .iter() + .find(|s| s.name == "request") + .expect("root span exported"); + let child = spans + .iter() + .find(|s| s.name == "child") + .expect("child span exported"); + + // `add_user_span_tags!` wrote to the current user span (the child), not the root. + let tag = child + .attributes + .iter() + .find(|kv| kv.key == "cache.status") + .expect("cache.status tag present on child"); + assert!(matches!( + &tag.value.as_ref().unwrap().value, + Some(Value::StringValue(v)) if v == "HIT" + )); + assert!(!root.attributes.iter().any(|kv| kv.key == "cache.status")); + + // Correct hierarchy: child is a child of root within the same trace. + assert_eq!(child.trace_id, root.trace_id); + assert_eq!(child.parent_span_id, root.span_id); + } + + // Routing set on the root is inherited by all descendants, so a grandchild also exports + // (the exporter drops spans without routing). + #[tokio::test] + async fn user_pipeline_inherits_routing_to_descendants() { + use crate::telemetry::settings::{UserTracesOutput, UserTracingSettings}; + use crate::telemetry::tracing::{start_user_trace, user_span}; + use prost::Message as _; + + let (socket_path, _dir, mut rx) = spawn_receptor(StatusCode::OK); + + let settings = UserTracingSettings { + enabled: true, + max_queue_size: None, + output: UserTracesOutput::OtlpUds(settings_for(&socket_path)), + }; + crate::telemetry::tracing::init::init_user(&crate::service_info!(), &settings).unwrap(); + + { + let _root = start_user_trace( + "request", + RoutingMetadata { + zone_id: 12345, + account_id: 42, + workspace_id: "ws-1".to_string(), + destinations: vec!["dest-a".to_string()], + managed: true, + }, + ); + let _child = user_span("child"); + let _grandchild = user_span("grandchild"); + } + + let captured = rx.recv().await.unwrap(); + let req = ExportTraceServiceRequest::decode(captured.body.as_slice()).unwrap(); + let names: Vec<&str> = req + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .map(|s| s.name.as_str()) + .collect(); + + assert!(names.contains(&"request")); + assert!(names.contains(&"child")); + assert!(names.contains(&"grandchild")); + } +} From 0b7d4e8b8f5263298ab92637a8ad9283ce758983 Mon Sep 17 00:00:00 2001 From: Mar Witek Date: Wed, 24 Jun 2026 03:33:40 +0200 Subject: [PATCH 7/7] Add W3C trace propagation for user spans Adds the `TraceparentContext` W3C parser and wires it through: `start_user_trace` gains an optional `inbound` traceparent that stitches the user root onto the upstream trace (shared trace id, inbound parent), and `user_tracing::w3c_traceparent()` derives the header for the current user span for outbound propagation. Covered by parser unit tests plus continuation tests through the test harness and the OTLP/UDS producer path. --- Cargo.toml | 1 + foundations/Cargo.toml | 2 + foundations/src/telemetry/tracing/internal.rs | 25 +- foundations/src/telemetry/tracing/mod.rs | 210 +++++++++++++- .../src/telemetry/tracing/output_otlp_uds.rs | 51 ++++ .../src/telemetry/tracing/traceparent.rs | 267 ++++++++++++++++++ 6 files changed, 541 insertions(+), 15 deletions(-) create mode 100644 foundations/src/telemetry/tracing/traceparent.rs diff --git a/Cargo.toml b/Cargo.toml index 4d0e797..e5560c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ darling = "0.23.0" erased-serde = "0.4.10" futures-util = "0.3.32" governor = "0.10.4" +hex = "0.4.3" http = "1.4.0" http-body-util = "0.1.3" hyper = { version = "1.9.0", default-features = false } diff --git a/foundations/Cargo.toml b/foundations/Cargo.toml index 4511d71..283452c 100644 --- a/foundations/Cargo.toml +++ b/foundations/Cargo.toml @@ -151,6 +151,7 @@ tracing = [ # Enables distributed user tracing functionality. user-tracing = [ "tracing", + "dep:hex", "dep:hyper", "hyper/client", "dep:hyper-util", @@ -235,6 +236,7 @@ crossbeam-utils = { workspace = true, optional = true } erased-serde = { workspace = true, optional = true } futures-util = { workspace = true, optional = true } governor = { workspace = true, optional = true } +hex = { workspace = true, optional = true } http = { workspace = true, optional = true } http-body-util = { workspace = true, optional = true } hyper = { workspace = true, optional = true, features = ["http1", "server"] } diff --git a/foundations/src/telemetry/tracing/internal.rs b/foundations/src/telemetry/tracing/internal.rs index 8eaaee0..d7ba92c 100644 --- a/foundations/src/telemetry/tracing/internal.rs +++ b/foundations/src/telemetry/tracing/internal.rs @@ -6,6 +6,8 @@ use cf_rustracing::sampler::BoxSampler; #[cfg(feature = "user-tracing")] use cf_rustracing::span::RoutingMetadata; use cf_rustracing::tag::Tag; +#[cfg(feature = "user-tracing")] +use cf_rustracing_jaeger::span::TraceId; use cf_rustracing_jaeger::span::{Span, SpanContext, SpanContextState}; use parking_lot::RwLock; use rand::RngExt as _; @@ -182,15 +184,32 @@ pub fn write_current_user_span(write_fn: impl FnOnce(&mut Span)) { write_fn(&mut span_guard); } -/// Starts a root user span on the user harness. `routing` is set at construction and inherited -/// by child spans. +/// Starts a root user span on the user harness, optionally continuing the inbound W3C trace. +/// `routing` is set at construction and inherited by child spans. #[cfg(feature = "user-tracing")] pub(crate) fn start_user_trace( name: impl Into>, routing: RoutingMetadata, + inbound: Option, ) -> Span { let tracer = TracingHarness::get_user().tracer(); - tracer.span(name).routing(routing).start() + let mut builder = tracer.span(name).routing(routing); + + if let Some(tp) = inbound { + let trace_id = TraceId { + high: u64::from_be_bytes(tp.trace_id[..8].try_into().unwrap()), + low: u64::from_be_bytes(tp.trace_id[8..].try_into().unwrap()), + }; + let state = SpanContextState::new( + trace_id, + u64::from_be_bytes(tp.parent_id), + tp.trace_flags, + String::new(), + ); + builder = builder.child_of(&SpanContext::new(state, vec![])); + } + + builder.start() } pub(super) fn reporter_error(err: impl Error) { diff --git a/foundations/src/telemetry/tracing/mod.rs b/foundations/src/telemetry/tracing/mod.rs index 9a9bea0..cf35958 100644 --- a/foundations/src/telemetry/tracing/mod.rs +++ b/foundations/src/telemetry/tracing/mod.rs @@ -21,6 +21,9 @@ mod output_otlp_grpc; #[cfg(feature = "user-tracing")] mod output_otlp_uds; +#[cfg(feature = "user-tracing")] +mod traceparent; + use self::init::TracingHarness; use self::internal::{SharedSpan, create_span, current_span, shared_span, span_trace_id}; #[cfg(feature = "user-tracing")] @@ -38,6 +41,9 @@ pub use self::testing::{TestSpan, TestTrace, TestTraceIterator, TestTraceOptions pub use cf_rustracing::tag::TagValue; pub use cf_rustracing_jaeger::span::{Span, SpanContextState as SerializableTraceState, TraceId}; +#[cfg(feature = "user-tracing")] +pub use self::traceparent::TraceparentContext; + #[cfg(feature = "user-tracing")] pub use cf_rustracing::span::RoutingMetadata; @@ -287,6 +293,7 @@ impl UserSpanScope { /// Converts the user span scope to a [`TelemetryContext`] that can be applied to a future. pub fn into_context(self) -> TelemetryContext { let mut ctx = TelemetryContext::current(); + ctx.user_span = Some(self.span); ctx @@ -491,16 +498,19 @@ pub fn start_trace( SpanScope::new(shared_span(internal::start_trace(root_span_name, options))) } -/// Starts a root user span (per-request activation). `routing` is attached at construction and -/// inherited by child spans. +/// Starts a root user span (per-request activation), optionally continuing the inbound W3C trace +/// from `inbound`. `routing` is attached at construction and inherited by child spans. /// /// Without an active root, `user_span` / `with_user_span` / `add_user_span_tags!` are no-ops. #[cfg(feature = "user-tracing")] pub fn start_user_trace( name: impl Into>, routing: RoutingMetadata, + inbound: Option, ) -> UserSpanScope { - UserSpanScope::new(user_shared_span(internal::start_user_trace(name, routing))) + UserSpanScope::new(user_shared_span(internal::start_user_trace( + name, routing, inbound, + ))) } /// Creates a user span as a child of the current user span, or inactive when no user trace is @@ -510,6 +520,28 @@ pub fn user_span(name: impl Into>) -> UserSpanScope { UserSpanScope::new(create_user_span(name)) } +/// Introspection and outbound-propagation helpers for the user-tracing pipeline. +#[cfg(feature = "user-tracing")] +pub mod user_tracing { + use super::internal::current_user_span; + + /// W3C `traceparent` for the current user span, for outbound propagation to the next hop. + /// Span-derived (parent-id is the current user span); `None` when no user trace is active. + pub fn w3c_traceparent() -> Option { + current_user_span()?.inner.with_read(|s| { + let state = s.context()?.state(); + + Some(format!( + "00-{:0>16x}{:0>16x}-{:0>16x}-{:0>2x}", + state.trace_id().high, + state.trace_id().low, + state.span_id(), + state.flags() + )) + }) + } +} + /// Returns the current span as a raw [rustracing] crate's `Span` that is used by Foundations internally. /// /// Can be used to propagate the tracing context to libraries that don't use Foundations' @@ -1088,8 +1120,8 @@ pub use __test_trace as test_trace; #[cfg(all(test, feature = "user-tracing", feature = "testing"))] mod user_tracing_tests { use super::{ - RoutingMetadata, add_user_span_log_fields, add_user_span_tags, - set_user_span_finish_callback, span, start_user_trace, test_trace, user_span, + RoutingMetadata, TraceparentContext, add_user_span_log_fields, add_user_span_tags, + set_user_span_finish_callback, span, start_user_trace, test_trace, user_span, user_tracing, }; use crate::telemetry::TelemetryContext; use crate::telemetry::tracing::{Span, TestTraceOptions}; @@ -1111,7 +1143,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request", routing()); + let _root = start_user_trace("request", routing(), None); let _child = user_span("child"); let _grandchild = user_span("grandchild"); } @@ -1136,7 +1168,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request", routing()); + let _root = start_user_trace("request", routing(), None); add_user_span_tags!("cache.status" => "HIT"); add_user_span_log_fields!("event" => "lookup"); } @@ -1165,7 +1197,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request", routing()); + let _root = start_user_trace("request", routing(), None); let _s = span("op").with_user_span(); } @@ -1178,6 +1210,99 @@ mod user_tracing_tests { ); } + // `with_user_span()` carried across an `.await` via its own scope's `into_context()`: the + // parallel user span survives the boundary and a second `with_user_span()` nests under it. + #[tokio::test] + async fn with_user_span_carried_across_await() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request", routing(), None); + + span("a") + .with_user_span() + .into_context() + .apply(async { + let _c = span("c").with_user_span(); + }) + .await; + } + + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "a" => { "c" } } }] + ); + assert_eq!( + ctx.traces(Default::default()), + vec![test_trace! { "a" => { "c" } }] + ); + } + + // Same propagation, but `into_context()` is taken on an *internal-only* span (`b`) between two + // user spans. The ambient user span (`a`) rides along, so the far-side user span (`c`) nests + // under `a` — the user tree skips `b`, which exists only in the internal tree. + #[tokio::test] + async fn with_user_span_carried_via_internal_span() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request", routing(), None); + let _a = span("a").with_user_span(); + + span("b") + .into_context() + .apply(async { + let _c = span("c").with_user_span(); + }) + .await; + } + + // User tree: c under a under the root; b is internal-only and absent here. + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "a" => { "c" } } }] + ); + // Internal tree: a -> b -> c. + assert_eq!( + ctx.traces(Default::default()), + vec![test_trace! { "a" => { "b" => { "c" } } }] + ); + } + + #[test] + fn continues_inbound_trace() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + let inbound = + TraceparentContext::parse(b"00-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-01") + .unwrap(); + let _root = start_user_trace("request", routing(), Some(inbound)); + + let out = user_tracing::w3c_traceparent().unwrap(); + assert!(out.starts_with("00-11223344556677889900aabbccddeeff-")); + } + + // Outbound: `w3c_traceparent()` is derived from the *current* user span — a child shares the + // root's 128-bit trace id but reports its own span id. + #[test] + fn outbound_traceparent_is_span_derived() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + let _root = start_user_trace("request", routing(), None); + let root_tp = user_tracing::w3c_traceparent().expect("root traceparent"); + + let _child = user_span("child"); + let child_tp = user_tracing::w3c_traceparent().expect("child traceparent"); + + // Same version + trace id ("00-" + 32 hex = 35 chars); different span id. + assert_eq!(&root_tp[..35], &child_tp[..35]); + assert_ne!(root_tp, child_tp); + } + #[test] fn no_op_without_activation() { let ctx = TelemetryContext::test(); @@ -1198,7 +1323,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request", routing()); + let _root = start_user_trace("request", routing(), None); set_user_span_finish_callback!(|span: &mut Span| { span.set_tag(|| Tag::new("finished", true)); }); @@ -1218,7 +1343,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let root_ctx = start_user_trace("request", routing()).into_context(); + let root_ctx = start_user_trace("request", routing(), None).into_context(); root_ctx .apply(async { let _child = user_span("child"); @@ -1240,7 +1365,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request", routing()); + let _root = start_user_trace("request", routing(), None); // Propagate via an internal span's context; never touch the user scope. span("internal") @@ -1263,6 +1388,67 @@ mod user_tracing_tests { ); } + // A real task boundary (`tokio::spawn`): the held context carries the user span into a + // separate task, where a child nests under the root. + #[tokio::test] + async fn user_span_carried_across_spawn() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let root_ctx = start_user_trace("request", routing(), None).into_context(); + tokio::spawn(root_ctx.apply(async { + let _child = user_span("child"); + })) + .await + .unwrap(); + } + + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "child" } }] + ); + } + + // Forking the *internal* trace must preserve the active user span. + #[test] + fn user_span_survives_forked_trace() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request", routing(), None); + let _forked = TelemetryContext::current() + .with_forked_trace("fork") + .scope(); + let _child = user_span("child"); + } + + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "child" } }] + ); + } + + // Forking the log must likewise preserve the active user span. + #[cfg(feature = "logging")] + #[test] + fn user_span_survives_forked_log() { + let ctx = TelemetryContext::test(); + let _scope = ctx.scope(); + + { + let _root = start_user_trace("request", routing(), None); + let _forked = TelemetryContext::current().with_forked_log().scope(); + let _child = user_span("child"); + } + + assert_eq!( + ctx.user_traces(Default::default()), + vec![test_trace! { "request" => { "child" } }] + ); + } + // Same property via the `#[span_fn]` macro path (a plain internal-traced async fn). #[crate::telemetry::tracing::span_fn("internal_fn", crate_path = "crate")] async fn internal_fn() { @@ -1275,7 +1461,7 @@ mod user_tracing_tests { let _scope = ctx.scope(); { - let _root = start_user_trace("request", routing()); + let _root = start_user_trace("request", routing(), None); internal_fn().await; } diff --git a/foundations/src/telemetry/tracing/output_otlp_uds.rs b/foundations/src/telemetry/tracing/output_otlp_uds.rs index 1daae7f..e2ca630 100644 --- a/foundations/src/telemetry/tracing/output_otlp_uds.rs +++ b/foundations/src/telemetry/tracing/output_otlp_uds.rs @@ -451,6 +451,7 @@ mod tests { destinations: vec!["dest-a".to_string()], managed: true, }, + None, ); let _child = user_span("child"); @@ -503,6 +504,55 @@ mod tests { assert_eq!(child.parent_span_id, root.span_id); } + // Stitching: a root started with an inbound `traceparent` continues that trace on the wire + // (same 128-bit trace id, and its parent is the inbound parent span id). + #[tokio::test] + async fn user_pipeline_continues_inbound_trace() { + use crate::telemetry::settings::{UserTracesOutput, UserTracingSettings}; + use crate::telemetry::tracing::{TraceparentContext, start_user_trace}; + use prost::Message as _; + + let (socket_path, _dir, mut rx) = spawn_receptor(StatusCode::OK); + + let settings = UserTracingSettings { + enabled: true, + max_queue_size: None, + output: UserTracesOutput::OtlpUds(settings_for(&socket_path)), + }; + crate::telemetry::tracing::init::init_user(&crate::service_info!(), &settings).unwrap(); + + let inbound = + TraceparentContext::parse(b"00-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-01") + .unwrap(); + + { + let _root = start_user_trace( + "request", + RoutingMetadata { + zone_id: 12345, + account_id: 42, + workspace_id: "ws-1".to_string(), + destinations: vec!["dest-a".to_string()], + managed: true, + }, + Some(inbound), + ); + } + + let captured = rx.recv().await.unwrap(); + let req = ExportTraceServiceRequest::decode(captured.body.as_slice()).unwrap(); + let root = req + .resource_spans + .iter() + .flat_map(|rs| &rs.scope_spans) + .flat_map(|ss| &ss.spans) + .find(|s| s.name == "request") + .expect("root span exported"); + + assert_eq!(root.trace_id, inbound.trace_id); + assert_eq!(root.parent_span_id, inbound.parent_id); + } + // Routing set on the root is inherited by all descendants, so a grandchild also exports // (the exporter drops spans without routing). #[tokio::test] @@ -530,6 +580,7 @@ mod tests { destinations: vec!["dest-a".to_string()], managed: true, }, + None, ); let _child = user_span("child"); let _grandchild = user_span("grandchild"); diff --git a/foundations/src/telemetry/tracing/traceparent.rs b/foundations/src/telemetry/tracing/traceparent.rs new file mode 100644 index 0000000..50f3f78 --- /dev/null +++ b/foundations/src/telemetry/tracing/traceparent.rs @@ -0,0 +1,267 @@ +//! W3C Trace Context `traceparent` header parsing. +//! +//! Format: `{version}-{trace-id}-{parent-id}-{trace-flags}` (55 bytes, all ASCII). +//! +//! Example: `00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01` +//! +//! See . +//! +//! This is the inbound counterpart to the outbound formatter +//! [`w3c_traceparent`](crate::telemetry::tracing::user_tracing::w3c_traceparent): it parses the +//! W3C `traceparent` carried in the user-tracing control header so an inbound trace can be +//! continued by [`start_user_trace`](crate::telemetry::tracing::start_user_trace). + +/// Parsed components of a valid W3C `traceparent` header value. +/// +/// Constructed via [`TraceparentContext::parse`]. Round-trippable via +/// [`TraceparentContext::to_traceparent_string`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TraceparentContext { + /// W3C trace-context version. Only `0` (the `00` wire version) is currently accepted. + pub version: u8, + /// 16-byte (128-bit) trace identifier. + pub trace_id: [u8; 16], + /// 8-byte (64-bit) parent span identifier. + pub parent_id: [u8; 8], + /// Trace flags bitfield; bit 0 is the "sampled" flag. + pub trace_flags: u8, +} + +impl TraceparentContext { + /// Parse a `traceparent` header value. + /// + /// Returns `None` on any of: + /// - Not valid UTF-8 + /// - Wrong total length or missing/extra dashes + /// - Wrong field widths (version=2, trace-id=32, parent-id=16, flags=2) + /// - Non-hex characters in any field + /// - Unsupported version (only `00` accepted per current W3C spec) + /// - All-zero trace-id or parent-id (invalid per spec) + pub fn parse(value: &[u8]) -> Option { + let s = std::str::from_utf8(value).ok()?; + + let mut parts = s.splitn(4, '-'); + let version_str = parts.next()?; + let trace_id_str = parts.next()?; + let parent_id_str = parts.next()?; + let flags_str = parts.next()?; + + // Trailing content (extra dashes) ends up in `flags_str` since `splitn(4)` + // packs any remainder into the last segment; the length check below catches it. + if version_str.len() != 2 + || trace_id_str.len() != 32 + || parent_id_str.len() != 16 + || flags_str.len() != 2 + { + return None; + } + + let version = u8::from_str_radix(version_str, 16).ok()?; + // Only version 00 is supported. + if version != 0 { + return None; + } + + let trace_flags = u8::from_str_radix(flags_str, 16).ok()?; + + let mut trace_id = [0u8; 16]; + hex::decode_to_slice(trace_id_str, &mut trace_id).ok()?; + + let mut parent_id = [0u8; 8]; + hex::decode_to_slice(parent_id_str, &mut parent_id).ok()?; + + // Per W3C spec, all-zero trace-id and parent-id are invalid. + if trace_id == [0u8; 16] || parent_id == [0u8; 8] { + return None; + } + + Some(Self { + version, + trace_id, + parent_id, + trace_flags, + }) + } + + /// Returns true if the sampled bit (bit 0) of `trace_flags` is set, + /// indicating the caller decided this trace should be recorded. + pub fn is_sampled(&self) -> bool { + self.trace_flags & 0x01 != 0 + } + + /// Re-serialize the parsed context back to a canonical W3C `traceparent` string. + pub fn to_traceparent_string(self) -> String { + let mut buf = vec![0u8; 55]; + hex::encode_to_slice([self.version], &mut buf[0..2]).expect("size=2"); + buf[2] = b'-'; + hex::encode_to_slice(self.trace_id, &mut buf[3..35]).expect("size=32"); + buf[35] = b'-'; + hex::encode_to_slice(self.parent_id, &mut buf[36..52]).expect("size=16"); + buf[52] = b'-'; + hex::encode_to_slice([self.trace_flags], &mut buf[53..55]).expect("size=2"); + String::from_utf8(buf).expect("valid UTF-8") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const VALID: &str = "00-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-01"; + + #[test] + fn parse_valid() { + let tp = TraceparentContext::parse(VALID.as_bytes()).expect("valid"); + assert_eq!(tp.version, 0); + assert_eq!( + tp.trace_id, + [ + 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00, 0xaa, 0xbb, 0xcc, 0xdd, + 0xee, 0xff, + ] + ); + assert_eq!( + tp.parent_id, + [0xa1, 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x07, 0x18] + ); + assert_eq!(tp.trace_flags, 0x01); + assert!(tp.is_sampled()); + } + + #[test] + fn roundtrip() { + let tp = TraceparentContext::parse(VALID.as_bytes()).expect("valid"); + assert_eq!(tp.to_traceparent_string(), VALID); + } + + #[test] + fn uppercase_hex_accepted() { + // Uppercase input accepted; output is always lowercase. + let uppercase = "00-11223344556677889900AABBCCDDEEFF-A1B2C3D4E5F60718-01"; + let tp = TraceparentContext::parse(uppercase.as_bytes()).expect("uppercase accepted"); + assert_eq!(tp.to_traceparent_string(), VALID); + } + + #[test] + fn sampled_with_extra_flags() { + // Bit 0 = sampled; extra bits (e.g. 0x02) are preserved and propagated. + let s = "00-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-03"; + let tp = TraceparentContext::parse(s.as_bytes()).expect("valid"); + assert_eq!(tp.trace_flags, 0x03); + assert!(tp.is_sampled()); + } + + #[test] + fn unsampled_is_valid() { + let s = "00-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-00"; + let tp = TraceparentContext::parse(s.as_bytes()).expect("valid"); + assert_eq!(tp.trace_flags, 0x00); + assert!(!tp.is_sampled()); + } + + #[test] + fn empty() { + assert!(TraceparentContext::parse(b"").is_none()); + } + + #[test] + fn degenerate() { + assert!(TraceparentContext::parse(b"---").is_none()); + assert!(TraceparentContext::parse(b"00-1-1-00").is_none()); + } + + #[test] + fn wrong_total_length() { + // Too short (flags truncated) + assert!( + TraceparentContext::parse(b"00-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-0") + .is_none() + ); + // Too long (extra char on flags) + assert!( + TraceparentContext::parse(b"00-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-012") + .is_none() + ); + } + + #[test] + fn wrong_field_sizes() { + // version too short + assert!( + TraceparentContext::parse(b"0-11223344556677889900aabbccddeeff0-a1b2c3d4e5f60718-01") + .is_none() + ); + // trace-id too long + assert!( + TraceparentContext::parse(b"00-11223344556677889900aabbccddeeff0-1b2c3d4e5f60718-01") + .is_none() + ); + // trace-id too short + assert!( + TraceparentContext::parse(b"00-11223344556677889900aabbccddee-a1b2c3d4e5f6071800-01") + .is_none() + ); + // parent-id too long + assert!( + TraceparentContext::parse(b"00-1223344556677889900aabbccddeeff-a1b2c3d4e5f607180-01") + .is_none() + ); + // parent-id too short + assert!( + TraceparentContext::parse(b"00-112233445566778899900aabbccddeeff-1b2c3d4e5f6071-01") + .is_none() + ); + } + + #[test] + fn empty_fields() { + assert!(TraceparentContext::parse(b"00--a1b2c3d4e5f60718-01").is_none()); + assert!(TraceparentContext::parse(b"00-11223344556677889900aabbccddeeff--01").is_none()); + } + + #[test] + fn bad_hex() { + assert!( + TraceparentContext::parse(b"0g-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-01") + .is_none() + ); + assert!( + TraceparentContext::parse(b"00-x1223344556677889900aabbccddeeff-a1b2c3d4e5f60718-01") + .is_none() + ); + assert!( + TraceparentContext::parse(b"00-11223344556677889900aabbccddeeff-a1b2c3d4e5f6071x-01") + .is_none() + ); + } + + #[test] + fn unsupported_version() { + // Future versions rejected: current spec only defines version 00. + assert!( + TraceparentContext::parse(b"01-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-01") + .is_none() + ); + assert!( + TraceparentContext::parse(b"ff-11223344556677889900aabbccddeeff-a1b2c3d4e5f60718-01") + .is_none() + ); + } + + #[test] + fn all_zero_trace_id_rejected() { + let s = "00-00000000000000000000000000000000-a1b2c3d4e5f60718-01"; + assert!(TraceparentContext::parse(s.as_bytes()).is_none()); + } + + #[test] + fn all_zero_parent_id_rejected() { + let s = "00-11223344556677889900aabbccddeeff-0000000000000000-01"; + assert!(TraceparentContext::parse(s.as_bytes()).is_none()); + } + + #[test] + fn non_utf8() { + assert!(TraceparentContext::parse(&[0xff, 0xfe, 0xfd]).is_none()); + } +}